Prerequisites:
- module-8/03-binlog-retention-heartbeat
Настройка MySQL CDC коннектора
От теории к практике
В предыдущих трех уроках мы подробно изучили MySQL binlog: архитектуру, GTID mode, retention и heartbeat механизмы. Теперь пришло время применить это знание и настроить полноценный CDC connector для чтения изменений из MySQL.
Почему MySQL connector отличается от PostgreSQL?
Если вы прошли Module 2 (PostgreSQL CDC), вы уже знаете паттерн развертывания Debezium connector через Kafka Connect REST API. Но MySQL connector имеет критические архитектурные отличия:
PostgreSQL (server-side tracking):
- Replication slot хранит LSN position на сервере PostgreSQL
- Publication определяет, какие таблицы реплицировать
- Position tracking автоматически управляется через replication slot
MySQL (client-side tracking):
- Нет replication slot - позиция хранится в Kafka Connect offset storage
- database.server.id требуется для интеграции в MySQL cluster topology
- schema.history.internal.kafka.topic обязателен для восстановления схем после перезапуска
Эти отличия делают MySQL connector проще в некоторых аспектах (не нужно управлять слотами), но требуют дополнительной осторожности в других (schema history topic, server.id uniqueness).
Prerequisites Checklist
Перед началом убедитесь, что у вас выполнены шаги из Phase 12:
MySQL Configuration
- ✓ MySQL 8.0.40 запущен в Docker Compose
- ✓
binlog-format=ROW(проверено в Lesson 1) - ✓
gtid-mode=ON(настроено в Lesson 2) - ✓
binlog-expire-logs-seconds=604800(7 дней, Lesson 3) - ✓ Heartbeat таблица создана (Lesson 3)
Kafka Connect Service
- ✓ Debezium Connect контейнер запущен (порт 8083)
- ✓ Kafka cluster доступен (порт 9092)
Проверка готовности:
# MySQL готов?
docker compose exec mysql mysql -u root -pmysql -e "SELECT VERSION()"
# Kafka Connect доступен?
curl -s http://localhost:8083/ | jq .version
# Binlog ROW format?
docker compose exec mysql mysql -u root -pmysql -e "SHOW VARIABLES LIKE 'binlog_format'"
# GTID mode ON?
docker compose exec mysql mysql -u root -pmysql -e "SHOW VARIABLES LIKE 'gtid_mode'"
Если все команды выполнились успешно - вы готовы к настройке connector.
Schema History Topic: КРИТИЧЕСКАЯ подготовка
MySQL connector требует schema history topic для восстановления схем таблиц при перезапуске. Этот topic должен быть создан до развертывания connector с бесконечным retention.
Почему это критично?
- Schema history topic содержит все DDL statements (CREATE TABLE, ALTER TABLE) с binlog позициями
- Debezium использует этот topic для воссоздания схем при чтении старых binlog событий
- Если topic purged (удален через retention), connector не сможет перезапуститься
Что произойдет, если не настроить retention=-1?
Confluent Kafka по умолчанию хранит данные 7 дней (log.retention.ms=604800000). Через неделю:
- Schema history events purged
- Connector перезапускается (обновление, failover)
- Ошибка: “The db history topic or its content is fully or partially missing”
- Требуется полный resnapshot базы данных (часы/дни для больших баз)
Создание Schema History Topic
# Создать topic с infinite retention
docker compose exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic schema-changes.mysql-server \
--partitions 1 \
--replication-factor 1 \
--config retention.ms=-1 \
--config retention.bytes=-1
Параметры topic:
| Параметр | Значение | Обоснование |
|---|---|---|
--partitions | 1 | Обязательно single partition для сохранения порядка DDL |
--replication-factor | 1 | В dev среде достаточно. Production: 3 |
retention.ms | -1 | Infinite retention - данные хранятся вечно |
retention.bytes | -1 | No size limit - topic не ограничен по размеру |
Проверка конфигурации:
# Verify topic configuration
docker compose exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic schema-changes.mysql-server
Ожидаемый output:
Topic: schema-changes.mysql-server PartitionCount: 1 ReplicationFactor: 1 Configs: retention.ms=-1,retention.bytes=-1
Topic: schema-changes.mysql-server Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Ключевые моменты:
retention.ms=-1(infinite retention)retention.bytes=-1(no size limit)PartitionCount: 1(single partition)
Рекомендуемый паттерн: schema-changes.{database.server.name}
Примеры:
schema-changes.mysql-server(generic MySQL server)schema-changes.aurora-production(Aurora production cluster)schema-changes.orders-db(orders database cluster)
Такой паттерн позволяет легко идентифицировать, какому connector принадлежит schema history topic.
MySQL Connector Configuration: Полный обзор свойств
Теперь создадим конфигурацию connector. Мы разберем каждое свойство с объяснением его роли.
Структура конфигурации
Создайте файл mysql-connector-config.json:
{
"name": "mysql-inventory-connector",
"config": {
// === CONNECTOR CLASS (Required) ===
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// === DATABASE CONNECTION (Required) ===
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
// === MYSQL CLUSTER INTEGRATION (Required, MySQL-specific) ===
"database.server.id": "184054",
"database.server.name": "mysql-server",
// === TABLE FILTERING (Required) ===
"table.include.list": "inventory.customers,inventory.orders,inventory.products",
// === SCHEMA HISTORY (Required, MySQL-specific) ===
"schema.history.internal.kafka.topic": "schema-changes.mysql-server",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
// === SNAPSHOT CONFIGURATION (Optional) ===
"snapshot.mode": "initial",
// === HEARTBEAT (Recommended from Lesson 3) ===
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO inventory.debezium_heartbeat (id, ts) VALUES (1, NOW()) ON DUPLICATE KEY UPDATE ts = NOW()"
}
}
Детальный разбор свойств
1. Connector Class
"connector.class": "io.debezium.connector.mysql.MySqlConnector"
Что это: Java класс, реализующий MySQL CDC connector.
Где находится: Bundled в Debezium Connect Docker image (quay.io/debezium/connect:2.5).
Альтернативы:
io.debezium.connector.postgresql.PostgresConnector(для PostgreSQL)io.debezium.connector.mongodb.MongoDbConnector(для MongoDB)
2. Database Connection Properties
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz"
Что это: Параметры подключения к MySQL.
Важно:
database.hostname: Используйте Docker service name (mysql), а неlocalhostdatabase.user: User должен иметь REPLICATION CLIENT, REPLICATION SLAVE, SELECT праваdatabase.password: В production используйте secrets management (Vault, AWS Secrets Manager)
Настройка debezium user (если еще не создан):
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'debezium'@'%';
-- Права на heartbeat таблицу
GRANT INSERT, UPDATE ON inventory.debezium_heartbeat TO 'debezium'@'%';
FLUSH PRIVILEGES;
3. MySQL Cluster Integration
"database.server.id": "184054"Что это: Уникальный идентификатор Debezium connector в MySQL cluster topology.
Почему критично:
- MySQL binlog replication требует unique server ID для каждого участника cluster
- Если server.id совпадает с другим MySQL server или replica, возникает конфликт
- Конфликт приводит к ошибке: “Slave has the same server UUID as this server”
Как выбрать значение:
| Entity | Typical server.id Range | Example |
|---|---|---|
| MySQL Primary Server | 1-100 | 1 |
| MySQL Read Replica 1 | 101-200 | 101 |
| MySQL Read Replica 2 | 101-200 | 102 |
| Debezium Connector 1 | 184000-184999 | 184054 |
| Debezium Connector 2 | 184000-184999 | 184055 |
Проверка uniqueness:
-- На MySQL primary server
SHOW VARIABLES LIKE 'server_id';
-- Результат: 1 (из docker-compose.yml: --server-id=1)
-- На read replica (если есть)
SHOW VARIABLES LIKE 'server_id';
-- Результат: 101, 102, etc.Убедитесь, что database.server.id в connector config не совпадает ни с одним из этих значений.
"database.server.name": "mysql-server"
Что это: Логическое имя MySQL server, используется как prefix для Kafka topics.
Примеры topic naming:
| Table | Topic Name |
|---|---|
inventory.customers | mysql-server.inventory.customers |
inventory.orders | mysql-server.inventory.orders |
inventory.products | mysql-server.inventory.products |
Важно:
- После развертывания connector нельзя менять
database.server.name - Изменение приведет к созданию новых topics, loss of offset tracking
- Используйте meaningful names (например,
aurora-production,orders-db)
Проверка знанийПочему schema history topic необходимо создать с infinite retention (retention.ms=-1) ДО развёртывания MySQL коннектора?
4. Table Filtering
"table.include.list": "inventory.customers,inventory.orders,inventory.products"
Что это: Whitelist таблиц для CDC capture.
Формат: {database}.{table} через запятую.
Поддерживаемые паттерны:
// Все таблицы в database
"table.include.list": "inventory.*"
// Несколько database
"table.include.list": "inventory.*,analytics.*"
// Regex паттерны
"table.include.list": "inventory\\.orders_.*" // orders_2023, orders_2024, etc.
Alternative: Blacklist:
// Exclude определенные таблицы
"table.exclude.list": "inventory.audit_logs,inventory.temp_.*"
Рекомендация: Используйте explicit include list для production (избегайте wildcards, чтобы случайно не захватить лишние таблицы).
5. Schema History Configuration
"schema.history.internal.kafka.topic": "schema-changes.mysql-server",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
Что это: Конфигурация для записи DDL history в Kafka topic.
Почему нужно два свойства:
schema.history.internal.kafka.topic: Name of the topic (созданного ранее)schema.history.internal.kafka.bootstrap.servers: Kafka connection (может отличаться от main Kafka cluster)
Что записывается в schema history topic:
- Все DDL statements:
CREATE TABLE,ALTER TABLE ADD COLUMN,DROP COLUMN - Binlog position (file:offset или GTID) для каждого DDL statement
- Используется для recovery: При перезапуске Debezium “воспроизводит” DDL statements для воссоздания схем
Старое название (Debezium 1.x):
"database.history.internal.kafka.topic": "schema-changes.mysql-server"Новое название (Debezium 2.0+):
"schema.history.internal.kafka.topic": "schema-changes.mysql-server"Если вы видите database.history.* в старых туториалах - используйте schema.history.internal.* для Debezium 2.x.
6. Snapshot Configuration
"snapshot.mode": "initial"
Что это: Режим создания initial snapshot при первом запуске connector.
Режимы snapshot:
| Mode | Behavior | When to Use |
|---|---|---|
initial | Full snapshot при первом запуске, затем binlog streaming | Default, most common |
never | Пропустить snapshot, начать сразу с binlog | Deprecated, use no_data |
no_data | Только schema snapshot (no data), затем binlog | Data уже в Kafka topics |
when_needed | Snapshot только если нет saved offset | Resume after failure |
schema_only | Только CREATE TABLE statements, no data | Schema evolution testing |
Как работает initial snapshot:
op=r (read events)
op=c/u/d (real-time)
Duration snapshot для больших таблиц:
- 1 GB table ~ 2-5 минут
- 10 GB table ~ 20-50 минут
- 100 GB table ~ 3-8 часов
- 1 TB table ~ 1-3 дня
Для больших баз рекомендуется: Incremental snapshots (advanced topic, вне scope этого урока).
7. Heartbeat Configuration
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO inventory.debezium_heartbeat (id, ts) VALUES (1, NOW()) ON DUPLICATE KEY UPDATE ts = NOW()"
Что это: Механизм предотвращения position loss на idle таблицах (подробно изучили в Lesson 3).
Параметры:
-
heartbeat.interval.ms: Интервал между heartbeat events (миллисекунды)- 10000ms (10 секунд) - хороший default
- Production: можно увеличить до 60000ms (1 минута)
-
heartbeat.action.query: SQL запрос для создания heartbeat event- Обновляет timestamp в
debezium_heartbeatтаблице ON DUPLICATE KEY UPDATEизбегает накопления записей
- Обновляет timestamp в
Без heartbeat: Idle таблицы могут привести к position loss при binlog purge.
С heartbeat: Offset постоянно обновляется, даже если таблицы idle.
Deploying Connector via REST API: Hands-On Lab
Теперь применим всю конфигурацию и развернем connector.
Step 1: Подготовка конфигурации
Создайте файл в директории labs:
cd labs
nano mysql-connector-config.json
Вставьте полную конфигурацию (см. выше раздел “Структура конфигурации”).
Сохраните файл (Ctrl+O, Enter, Ctrl+X).
Step 2: Deploy Connector
Отправьте POST request к Kafka Connect REST API:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @mysql-connector-config.json
Ожидаемый успешный response:
{
"name": "mysql-inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
...
},
"tasks": [],
"type": "source"
}
Если ошибка: Проверьте синтаксис JSON (запятые, кавычки), убедитесь, что schema history topic создан.
Step 3: Проверка статуса connector
curl -s http://localhost:8083/connectors/mysql-inventory-connector/status | jq .
Успешный output:
{
"name": "mysql-inventory-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect:8083"
}
],
"type": "source"
}
Ключевые индикаторы:
"state": "RUNNING"- connector работаетtasks[0].state: "RUNNING"- task выполняется- Если
"state": "FAILED"- проверьте логи:docker compose logs debezium-connect
Step 4: Verify Topics Created
docker compose exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--list | grep mysql-server
Ожидаемые topics:
mysql-server.inventory.customers
mysql-server.inventory.orders
mysql-server.inventory.products
Если topics не появились через 10-20 секунд:
- Проверьте connector status (Step 3)
- Проверьте логи:
docker compose logs debezium-connect | tail -50 - Убедитесь, что таблицы существуют в MySQL:
docker compose exec mysql mysql -u root -pmysql -e "SHOW TABLES FROM inventory"
Step 5: Monitor Initial Snapshot Progress
Во время snapshot connector публикует metrics:
# Проверить connector logs
docker compose logs -f debezium-connect | grep -i snapshot
Ожидаемые log messages:
Snapshot is using user 'debezium' with these MySQL grants: [...]
Creating initial consistent snapshot for database 'inventory'
Snapshotting table 'inventory.customers' (1 of 3 tables)
Exporting data from table 'inventory.customers'
For table 'inventory.customers' read 1000 rows in 00:00:01.234
Snapshot completed for table 'inventory.customers'
Snapshotting table 'inventory.orders' (2 of 3 tables)
...
Snapshot completed successfully
Duration: Для sample inventory database (несколько тысяч записей) snapshot займет 10-30 секунд.
Verifying CDC Event Flow: Hands-On
Connector развернут, snapshot завершен. Теперь проверим, что CDC events корректно публикуются в Kafka.
Test 1: Consume Initial Snapshot Events
docker compose exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic mysql-server.inventory.customers \
--from-beginning \
--max-messages 1
Ожидаемый event (пример упрощен):
{
"schema": { ... },
"payload": {
"before": null,
"after": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "[email protected]"
},
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "mysql-server",
"ts_ms": 1738406400000,
"snapshot": "true",
"db": "inventory",
"table": "customers",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0
},
"op": "r",
"ts_ms": 1738406401234
}
}
Ключевые поля:
"before": null- snapshot event не имеет “before” state"after": {...}- текущее состояние записи"op": "r"- READ operation (snapshot)"snapshot": "true"- event создан во время snapshot"gtid": null- snapshot events не имеют GTID (до streaming phase)
Test 2: Insert New Row (Real-Time CDC)
Подключитесь к MySQL:
docker compose exec mysql mysql -u root -pmysql inventory
Вставьте новую запись:
INSERT INTO customers (first_name, last_name, email)
VALUES ('John', 'Doe', '[email protected]');
Выйдите из MySQL (exit).
Consume event:
docker compose exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic mysql-server.inventory.customers \
--from-beginning \
--max-messages 100 | grep '"op":"c"'
Ожидаемый INSERT event:
{
"payload": {
"before": null,
"after": {
"id": 1005,
"first_name": "John",
"last_name": "Doe",
"email": "[email protected]"
},
"source": {
"snapshot": "false",
"db": "inventory",
"table": "customers",
"server_id": 1,
"gtid": "3e11fa47-71ca-11e1-9e33-c80aa9429562:150",
"file": "mysql-bin.000003",
"pos": 2548
},
"op": "c",
"ts_ms": 1738406450000
}
}
Ключевые отличия от snapshot event:
"op": "c"- CREATE operation (INSERT)"snapshot": "false"- real-time binlog streaming"gtid": "..."- GTID position (streaming phase использует GTID)"server_id": 1- MySQL server id (из docker-compose.yml: —server-id=1)
Test 3: Update Row
docker compose exec mysql mysql -u root -pmysql inventory -e "
UPDATE customers SET email = '[email protected]' WHERE id = 1005;
"
Consume event:
docker compose exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic mysql-server.inventory.customers \
--from-beginning \
--max-messages 100 | grep '"op":"u"'
UPDATE event structure:
{
"payload": {
"before": {
"id": 1005,
"first_name": "John",
"last_name": "Doe",
"email": "[email protected]"
},
"after": {
"id": 1005,
"first_name": "John",
"last_name": "Doe",
"email": "[email protected]"
},
"source": {
"gtid": "3e11fa47-71ca-11e1-9e33-c80aa9429562:151",
"file": "mysql-bin.000003",
"pos": 2890
},
"op": "u"
}
}
Обратите внимание:
"before"содержит старое значение email"after"содержит новое значение email- Consumer может вычислить, какие колонки изменились
Test 4: Delete Row
docker compose exec mysql mysql -u root -pmysql inventory -e "
DELETE FROM customers WHERE id = 1005;
"
DELETE event structure:
{
"payload": {
"before": {
"id": 1005,
"first_name": "John",
"last_name": "Doe",
"email": "[email protected]"
},
"after": null,
"source": {
"gtid": "3e11fa47-71ca-11e1-9e33-c80aa9429562:152"
},
"op": "d"
}
}
Ключевой момент:
"after": null- запись удалена"before"содержит last known state (это возможно благодаряbinlog-row-image=FULLиз docker-compose.yml)
Если бы binlog-row-image=MINIMAL, DELETE event содержал бы только:
- Primary key в
before - Остальные колонки =
null
С FULL (наша конфигурация): before содержит все колонки, что позволяет consumer знать полное состояние удаленной записи.
Проверка знанийЧем отличаются поля op и snapshot в CDC событиях при initial snapshot и при real-time streaming?
Troubleshooting Common Deployment Errors
Даже при корректной конфигурации могут возникнуть ошибки. Вот самые частые проблемы и их решения.
Error 1: “Server ID already in use”
Полная ошибка:
com.github.shyiko.mysql.binlog.network.ServerException:
Slave has the same server UUID as this server
Причина: database.server.id в connector config совпадает с MySQL server id или другим replica.
Диагностика:
-- Проверить server_id MySQL
docker compose exec mysql mysql -u root -pmysql -e "SHOW VARIABLES LIKE 'server_id'"
-- Результат: 1 (из docker-compose.yml)
-- Проверить server_id в connector config
curl -s http://localhost:8083/connectors/mysql-inventory-connector | jq '.config."database.server.id"'
-- Результат: должно быть != 1
Решение:
- Выбрать уникальный server.id (например, 184054)
- Обновить connector config:
curl -X PUT http://localhost:8083/connectors/mysql-inventory-connector/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
...
"database.server.id": "184054",
...
}'
- Перезапустить connector:
curl -X POST http://localhost:8083/connectors/mysql-inventory-connector/restart
Error 2: “binlog_format is not ROW”
Полная ошибка:
The MySQL server is not configured to use a ROW binlog_format,
which is required for this connector to work properly.
Change the MySQL configuration to use a binlog_format=ROW and restart the connector.
Причина: MySQL не настроен на ROW binlog format.
Диагностика:
SHOW VARIABLES LIKE 'binlog_format';
Если результат: STATEMENT или MIXED - проблема подтверждена.
Решение:
В docker-compose.yml добавить (или проверить наличие):
mysql:
command:
- "--binlog-format=ROW"
Перезапустить MySQL:
docker compose restart mysql
Проверить:
SHOW VARIABLES LIKE 'binlog_format';
-- Должно быть: ROW
Перезапустить connector (см. Error 1, step 3).
Error 3: “Database history topic is missing”
Полная ошибка:
The db history topic or its content is fully or partially missing.
Please check database history topic configuration and re-execute the snapshot.
Причина: Schema history topic не создан или был purged.
Диагностика:
# Проверить, существует ли topic
docker compose exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--list | grep schema-changes
# Проверить конфигурацию retention
docker compose exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic schema-changes.mysql-server
Если topic отсутствует или retention.ms != -1 - проблема подтверждена.
Решение:
- Создать topic с infinite retention (см. раздел “Schema History Topic” выше)
- Удалить connector offset (force resnapshot):
# Stop connector
curl -X PUT http://localhost:8083/connectors/mysql-inventory-connector/pause
# Delete connector (removes offset)
curl -X DELETE http://localhost:8083/connectors/mysql-inventory-connector
# Recreate connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @mysql-connector-config.json
Connector выполнит full resnapshot.
Error 4: “Cannot replicate because master purged required binary logs”
Полная ошибка:
Got fatal error 1236 from master when reading data from binary log:
'Cannot replicate because the master purged required binary logs'
Причина: Debezium offset указывает на binlog файл, который был purged (удален через retention).
Диагностика:
-- Проверить, какие binlog файлы доступны
SHOW BINARY LOGS;
-- Проверить purged GTIDs
SHOW GLOBAL VARIABLES LIKE 'gtid_purged';
Если Debezium offset GTID находится в gtid_purged range - восстановление невозможно без resnapshot.
Решение:
Option A: Resnapshot (безопасно, но медленно):
# Изменить snapshot.mode на initial
curl -X PUT http://localhost:8083/connectors/mysql-inventory-connector/config \
-H "Content-Type: application/json" \
-d '{
...
"snapshot.mode": "initial",
...
}'
# Delete connector offset and recreate (как в Error 3)
Option B: Prevention (избежать в будущем):
- Увеличить
binlog-expire-logs-secondsв MySQL (например, с 7 до 14 дней) - Настроить heartbeat (если еще не настроен)
- Мониторить Debezium lag (см. Lesson 3, раздел “Мониторинг Binlog Health”)
Error 5: “Access denied for user ‘debezium’”
Полная ошибка:
Access denied for user 'debezium'@'%' to database 'inventory'
Причина: User debezium не имеет необходимых прав.
Диагностика:
SHOW GRANTS FOR 'debezium'@'%';
Проверить наличие:
REPLICATION CLIENTREPLICATION SLAVESELECTна нужных databases
Решение:
Выдать недостающие права:
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'debezium'@'%';
-- Если используется heartbeat
GRANT INSERT, UPDATE ON inventory.debezium_heartbeat TO 'debezium'@'%';
FLUSH PRIVILEGES;
Перезапустить connector.
Debugging Techniques
Для сложных проблем:
1. Проверить connector logs:
docker compose logs debezium-connect | tail -100
Искать ERROR, WARN, Exception.
2. Проверить Kafka Connect worker status:
curl -s http://localhost:8083/ | jq .
Должно вернуть версию и cluster ID.
3. Проверить все connectors:
curl -s http://localhost:8083/connectors | jq .
Убедиться, что mysql-inventory-connector в списке.
4. Проверить connector config:
curl -s http://localhost:8083/connectors/mysql-inventory-connector | jq '.config'
Сравнить с ожидаемой конфигурацией.
5. Check offset storage:
docker compose exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic connect-offsets \
--from-beginning \
--property print.key=true \
| grep mysql-inventory-connector
Найти сохраненный GTID position.
Key Takeaways
- MySQL connector отличается от PostgreSQL: Нет replication slots, client-side position tracking, schema history topic обязателен
- database.server.id must be unique в MySQL cluster topology (не совпадать с server.id MySQL primary/replica)
- Schema history topic критичен - создавать до deployment с
retention.ms=-1иretention.bytes=-1 - Обязательно single partition для schema history topic (сохранение порядка DDL statements)
- database.server.name используется как prefix для Kafka topics - нельзя менять после deployment
- snapshot.mode=initial выполняет full snapshot при первом запуске (может занять часы для больших баз)
- Heartbeat (из Lesson 3) предотвращает position loss на idle таблицах - обязателен для production
- REST API deployment паттерн: POST config → Check status → Verify topics → Test CDC events
- CDC event structure:
before(old state),after(new state),op(c/u/d/r),source(GTID, file, pos) - binlog-row-image=FULL позволяет DELETE events содержать все колонки в
beforestate - Common pitfalls: Server ID conflicts, missing schema history topic, binlog purge before reading, insufficient permissions
- Troubleshooting approach: Check connector status → Read logs → Verify MySQL config → Check offset storage
- Deprecated property:
database.history.*→ используйтеschema.history.internal.*для Debezium 2.x
Что дальше?
Вы освоили базовую настройку MySQL CDC connector. Следующий шаг - понять архитектурные отличия между MySQL binlog и PostgreSQL WAL подходами.
Следующий урок: Binlog vs WAL: Архитектурное сравнение
Мы создадим side-by-side comparison:
- Position tracking: GTID (MySQL) vs LSN (PostgreSQL)
- Schema evolution: Schema history topic (MySQL) vs Replication slot + Publication (PostgreSQL)
- Snapshot mechanisms: MySQL locking vs PostgreSQL export snapshot
- Failover scenarios: GTID automatic failover vs manual LSN adjustment
Понимание этих различий критично для дата-инженеров, которые работают с multi-database CDC pipelines (MySQL + PostgreSQL в одной инфраструктуре).
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress