Skip to content
Learning Platform
Intermediate
30 minutes
mysql debezium kafka-connect cdc rest-api

Prerequisites:

  • module-8/03-binlog-retention-heartbeat

Настройка MySQL CDC коннектора

От теории к практике

В предыдущих трех уроках мы подробно изучили MySQL binlog: архитектуру, GTID mode, retention и heartbeat механизмы. Теперь пришло время применить это знание и настроить полноценный CDC connector для чтения изменений из MySQL.

Почему MySQL connector отличается от PostgreSQL?

Если вы прошли Module 2 (PostgreSQL CDC), вы уже знаете паттерн развертывания Debezium connector через Kafka Connect REST API. Но MySQL connector имеет критические архитектурные отличия:

PostgreSQL (server-side tracking):

  • Replication slot хранит LSN position на сервере PostgreSQL
  • Publication определяет, какие таблицы реплицировать
  • Position tracking автоматически управляется через replication slot

MySQL (client-side tracking):

  • Нет replication slot - позиция хранится в Kafka Connect offset storage
  • database.server.id требуется для интеграции в MySQL cluster topology
  • schema.history.internal.kafka.topic обязателен для восстановления схем после перезапуска

Эти отличия делают MySQL connector проще в некоторых аспектах (не нужно управлять слотами), но требуют дополнительной осторожности в других (schema history topic, server.id uniqueness).

Prerequisites Checklist

Перед началом убедитесь, что у вас выполнены шаги из Phase 12:

MySQL Configuration

  • ✓ MySQL 8.0.40 запущен в Docker Compose
  • binlog-format=ROW (проверено в Lesson 1)
  • gtid-mode=ON (настроено в Lesson 2)
  • binlog-expire-logs-seconds=604800 (7 дней, Lesson 3)
  • ✓ Heartbeat таблица создана (Lesson 3)

Kafka Connect Service

  • ✓ Debezium Connect контейнер запущен (порт 8083)
  • ✓ Kafka cluster доступен (порт 9092)

Проверка готовности:

# MySQL готов?
docker compose exec mysql mysql -u root -pmysql -e "SELECT VERSION()"

# Kafka Connect доступен?
curl -s http://localhost:8083/ | jq .version

# Binlog ROW format?
docker compose exec mysql mysql -u root -pmysql -e "SHOW VARIABLES LIKE 'binlog_format'"

# GTID mode ON?
docker compose exec mysql mysql -u root -pmysql -e "SHOW VARIABLES LIKE 'gtid_mode'"

Если все команды выполнились успешно - вы готовы к настройке connector.

Schema History Topic: КРИТИЧЕСКАЯ подготовка

Danger

MySQL connector требует schema history topic для восстановления схем таблиц при перезапуске. Этот topic должен быть создан до развертывания connector с бесконечным retention.

Почему это критично?

  • Schema history topic содержит все DDL statements (CREATE TABLE, ALTER TABLE) с binlog позициями
  • Debezium использует этот topic для воссоздания схем при чтении старых binlog событий
  • Если topic purged (удален через retention), connector не сможет перезапуститься

Что произойдет, если не настроить retention=-1?

Confluent Kafka по умолчанию хранит данные 7 дней (log.retention.ms=604800000). Через неделю:

  1. Schema history events purged
  2. Connector перезапускается (обновление, failover)
  3. Ошибка: “The db history topic or its content is fully or partially missing”
  4. Требуется полный resnapshot базы данных (часы/дни для больших баз)

Создание Schema History Topic

# Создать topic с infinite retention
docker compose exec kafka kafka-topics \
  --bootstrap-server localhost:9092 \
  --create \
  --topic schema-changes.mysql-server \
  --partitions 1 \
  --replication-factor 1 \
  --config retention.ms=-1 \
  --config retention.bytes=-1

Параметры topic:

ПараметрЗначениеОбоснование
--partitions1Обязательно single partition для сохранения порядка DDL
--replication-factor1В dev среде достаточно. Production: 3
retention.ms-1Infinite retention - данные хранятся вечно
retention.bytes-1No size limit - topic не ограничен по размеру

Проверка конфигурации:

# Verify topic configuration
docker compose exec kafka kafka-topics \
  --bootstrap-server localhost:9092 \
  --describe \
  --topic schema-changes.mysql-server

Ожидаемый output:

Topic: schema-changes.mysql-server	PartitionCount: 1	ReplicationFactor: 1	Configs: retention.ms=-1,retention.bytes=-1
	Topic: schema-changes.mysql-server	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

Ключевые моменты:

  • retention.ms=-1 (infinite retention)
  • retention.bytes=-1 (no size limit)
  • PartitionCount: 1 (single partition)
Tip

Рекомендуемый паттерн: schema-changes.{database.server.name}

Примеры:

  • schema-changes.mysql-server (generic MySQL server)
  • schema-changes.aurora-production (Aurora production cluster)
  • schema-changes.orders-db (orders database cluster)

Такой паттерн позволяет легко идентифицировать, какому connector принадлежит schema history topic.

MySQL Connector Configuration: Полный обзор свойств

Теперь создадим конфигурацию connector. Мы разберем каждое свойство с объяснением его роли.

Структура конфигурации

Создайте файл mysql-connector-config.json:

{
  "name": "mysql-inventory-connector",
  "config": {
    // === CONNECTOR CLASS (Required) ===
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",

    // === DATABASE CONNECTION (Required) ===
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",

    // === MYSQL CLUSTER INTEGRATION (Required, MySQL-specific) ===
    "database.server.id": "184054",
    "database.server.name": "mysql-server",

    // === TABLE FILTERING (Required) ===
    "table.include.list": "inventory.customers,inventory.orders,inventory.products",

    // === SCHEMA HISTORY (Required, MySQL-specific) ===
    "schema.history.internal.kafka.topic": "schema-changes.mysql-server",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",

    // === SNAPSHOT CONFIGURATION (Optional) ===
    "snapshot.mode": "initial",

    // === HEARTBEAT (Recommended from Lesson 3) ===
    "heartbeat.interval.ms": "10000",
    "heartbeat.action.query": "INSERT INTO inventory.debezium_heartbeat (id, ts) VALUES (1, NOW()) ON DUPLICATE KEY UPDATE ts = NOW()"
  }
}

Детальный разбор свойств

1. Connector Class

"connector.class": "io.debezium.connector.mysql.MySqlConnector"

Что это: Java класс, реализующий MySQL CDC connector.

Где находится: Bundled в Debezium Connect Docker image (quay.io/debezium/connect:2.5).

Альтернативы:

  • io.debezium.connector.postgresql.PostgresConnector (для PostgreSQL)
  • io.debezium.connector.mongodb.MongoDbConnector (для MongoDB)

2. Database Connection Properties

"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz"

Что это: Параметры подключения к MySQL.

Важно:

  • database.hostname: Используйте Docker service name (mysql), а не localhost
  • database.user: User должен иметь REPLICATION CLIENT, REPLICATION SLAVE, SELECT права
  • database.password: В production используйте secrets management (Vault, AWS Secrets Manager)

Настройка debezium user (если еще не создан):

CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'debezium'@'%';

-- Права на heartbeat таблицу
GRANT INSERT, UPDATE ON inventory.debezium_heartbeat TO 'debezium'@'%';

FLUSH PRIVILEGES;

3. MySQL Cluster Integration

Danger
"database.server.id": "184054"

Что это: Уникальный идентификатор Debezium connector в MySQL cluster topology.

Почему критично:

  • MySQL binlog replication требует unique server ID для каждого участника cluster
  • Если server.id совпадает с другим MySQL server или replica, возникает конфликт
  • Конфликт приводит к ошибке: “Slave has the same server UUID as this server”

Как выбрать значение:

EntityTypical server.id RangeExample
MySQL Primary Server1-1001
MySQL Read Replica 1101-200101
MySQL Read Replica 2101-200102
Debezium Connector 1184000-184999184054
Debezium Connector 2184000-184999184055

Проверка uniqueness:

-- На MySQL primary server
SHOW VARIABLES LIKE 'server_id';
-- Результат: 1 (из docker-compose.yml: --server-id=1)

-- На read replica (если есть)
SHOW VARIABLES LIKE 'server_id';
-- Результат: 101, 102, etc.

Убедитесь, что database.server.id в connector config не совпадает ни с одним из этих значений.

"database.server.name": "mysql-server"

Что это: Логическое имя MySQL server, используется как prefix для Kafka topics.

Примеры topic naming:

TableTopic Name
inventory.customersmysql-server.inventory.customers
inventory.ordersmysql-server.inventory.orders
inventory.productsmysql-server.inventory.products

Важно:

  • После развертывания connector нельзя менять database.server.name
  • Изменение приведет к созданию новых topics, loss of offset tracking
  • Используйте meaningful names (например, aurora-production, orders-db)
Проверка знаний
Почему schema history topic необходимо создать с infinite retention (retention.ms=-1) ДО развёртывания MySQL коннектора?
Ответ
Schema history topic хранит все DDL-операции с binlog позициями. Debezium использует его для воссоздания схем таблиц при перезапуске. Если topic будет очищен через стандартный Kafka retention (7 дней), коннектор не сможет восстановиться и потребуется полный resnapshot.

4. Table Filtering

"table.include.list": "inventory.customers,inventory.orders,inventory.products"

Что это: Whitelist таблиц для CDC capture.

Формат: {database}.{table} через запятую.

Поддерживаемые паттерны:

// Все таблицы в database
"table.include.list": "inventory.*"

// Несколько database
"table.include.list": "inventory.*,analytics.*"

// Regex паттерны
"table.include.list": "inventory\\.orders_.*"  // orders_2023, orders_2024, etc.

Alternative: Blacklist:

// Exclude определенные таблицы
"table.exclude.list": "inventory.audit_logs,inventory.temp_.*"

Рекомендация: Используйте explicit include list для production (избегайте wildcards, чтобы случайно не захватить лишние таблицы).

5. Schema History Configuration

"schema.history.internal.kafka.topic": "schema-changes.mysql-server",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092"

Что это: Конфигурация для записи DDL history в Kafka topic.

Почему нужно два свойства:

  • schema.history.internal.kafka.topic: Name of the topic (созданного ранее)
  • schema.history.internal.kafka.bootstrap.servers: Kafka connection (может отличаться от main Kafka cluster)

Что записывается в schema history topic:

  • Все DDL statements: CREATE TABLE, ALTER TABLE ADD COLUMN, DROP COLUMN
  • Binlog position (file:offset или GTID) для каждого DDL statement
  • Используется для recovery: При перезапуске Debezium “воспроизводит” DDL statements для воссоздания схем
Warning

Старое название (Debezium 1.x):

"database.history.internal.kafka.topic": "schema-changes.mysql-server"

Новое название (Debezium 2.0+):

"schema.history.internal.kafka.topic": "schema-changes.mysql-server"

Если вы видите database.history.* в старых туториалах - используйте schema.history.internal.* для Debezium 2.x.

6. Snapshot Configuration

"snapshot.mode": "initial"

Что это: Режим создания initial snapshot при первом запуске connector.

Режимы snapshot:

ModeBehaviorWhen to Use
initialFull snapshot при первом запуске, затем binlog streamingDefault, most common
neverПропустить snapshot, начать сразу с binlogDeprecated, use no_data
no_dataТолько schema snapshot (no data), затем binlogData уже в Kafka topics
when_neededSnapshot только если нет saved offsetResume after failure
schema_onlyТолько CREATE TABLE statements, no dataSchema evolution testing

Как работает initial snapshot:

Snapshot Phase
SELECT * FROM tables
op=r (read events)
Streaming Phase
Read binlog events
op=c/u/d (real-time)
Connector
MySQL
Kafka
SET GLOBAL read_lock (if allowed)SELECT * FROM customers100,000 rowsPublish 100K eventsSELECT * FROM orders500,000 rowsPublish 500K eventsUNLOCK TABLESStart reading binlog from GTIDStream INSERT/UPDATE/DELETEPublish real-time CDC events
Критические параметры
database.server.id
184054
Unique in cluster
schema.history.topic
schema-changes.mysql
retention.ms=-1
heartbeat.interval.ms
10000
Keep offset fresh

Duration snapshot для больших таблиц:

  • 1 GB table ~ 2-5 минут
  • 10 GB table ~ 20-50 минут
  • 100 GB table ~ 3-8 часов
  • 1 TB table ~ 1-3 дня

Для больших баз рекомендуется: Incremental snapshots (advanced topic, вне scope этого урока).

7. Heartbeat Configuration

"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO inventory.debezium_heartbeat (id, ts) VALUES (1, NOW()) ON DUPLICATE KEY UPDATE ts = NOW()"

Что это: Механизм предотвращения position loss на idle таблицах (подробно изучили в Lesson 3).

Параметры:

  • heartbeat.interval.ms: Интервал между heartbeat events (миллисекунды)

    • 10000ms (10 секунд) - хороший default
    • Production: можно увеличить до 60000ms (1 минута)
  • heartbeat.action.query: SQL запрос для создания heartbeat event

    • Обновляет timestamp в debezium_heartbeat таблице
    • ON DUPLICATE KEY UPDATE избегает накопления записей

Без heartbeat: Idle таблицы могут привести к position loss при binlog purge.

С heartbeat: Offset постоянно обновляется, даже если таблицы idle.

Deploying Connector via REST API: Hands-On Lab

Теперь применим всю конфигурацию и развернем connector.

Step 1: Подготовка конфигурации

Создайте файл в директории labs:

cd labs
nano mysql-connector-config.json

Вставьте полную конфигурацию (см. выше раздел “Структура конфигурации”).

Сохраните файл (Ctrl+O, Enter, Ctrl+X).

Step 2: Deploy Connector

Отправьте POST request к Kafka Connect REST API:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @mysql-connector-config.json

Ожидаемый успешный response:

{
  "name": "mysql-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    ...
  },
  "tasks": [],
  "type": "source"
}

Если ошибка: Проверьте синтаксис JSON (запятые, кавычки), убедитесь, что schema history topic создан.

Step 3: Проверка статуса connector

curl -s http://localhost:8083/connectors/mysql-inventory-connector/status | jq .

Успешный output:

{
  "name": "mysql-inventory-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connect:8083"
    }
  ],
  "type": "source"
}

Ключевые индикаторы:

  • "state": "RUNNING" - connector работает
  • tasks[0].state: "RUNNING" - task выполняется
  • Если "state": "FAILED" - проверьте логи: docker compose logs debezium-connect

Step 4: Verify Topics Created

docker compose exec kafka kafka-topics \
  --bootstrap-server localhost:9092 \
  --list | grep mysql-server

Ожидаемые topics:

mysql-server.inventory.customers
mysql-server.inventory.orders
mysql-server.inventory.products

Если topics не появились через 10-20 секунд:

  1. Проверьте connector status (Step 3)
  2. Проверьте логи: docker compose logs debezium-connect | tail -50
  3. Убедитесь, что таблицы существуют в MySQL: docker compose exec mysql mysql -u root -pmysql -e "SHOW TABLES FROM inventory"

Step 5: Monitor Initial Snapshot Progress

Во время snapshot connector публикует metrics:

# Проверить connector logs
docker compose logs -f debezium-connect | grep -i snapshot

Ожидаемые log messages:

Snapshot is using user 'debezium' with these MySQL grants: [...]
Creating initial consistent snapshot for database 'inventory'
Snapshotting table 'inventory.customers' (1 of 3 tables)
Exporting data from table 'inventory.customers'
For table 'inventory.customers' read 1000 rows in 00:00:01.234
Snapshot completed for table 'inventory.customers'
Snapshotting table 'inventory.orders' (2 of 3 tables)
...
Snapshot completed successfully

Duration: Для sample inventory database (несколько тысяч записей) snapshot займет 10-30 секунд.

Verifying CDC Event Flow: Hands-On

Connector развернут, snapshot завершен. Теперь проверим, что CDC events корректно публикуются в Kafka.

Test 1: Consume Initial Snapshot Events

docker compose exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic mysql-server.inventory.customers \
  --from-beginning \
  --max-messages 1

Ожидаемый event (пример упрощен):

{
  "schema": { ... },
  "payload": {
    "before": null,
    "after": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "[email protected]"
    },
    "source": {
      "version": "2.5.0.Final",
      "connector": "mysql",
      "name": "mysql-server",
      "ts_ms": 1738406400000,
      "snapshot": "true",
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0
    },
    "op": "r",
    "ts_ms": 1738406401234
  }
}

Ключевые поля:

  • "before": null - snapshot event не имеет “before” state
  • "after": {...} - текущее состояние записи
  • "op": "r" - READ operation (snapshot)
  • "snapshot": "true" - event создан во время snapshot
  • "gtid": null - snapshot events не имеют GTID (до streaming phase)

Test 2: Insert New Row (Real-Time CDC)

Подключитесь к MySQL:

docker compose exec mysql mysql -u root -pmysql inventory

Вставьте новую запись:

INSERT INTO customers (first_name, last_name, email)
VALUES ('John', 'Doe', '[email protected]');

Выйдите из MySQL (exit).

Consume event:

docker compose exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic mysql-server.inventory.customers \
  --from-beginning \
  --max-messages 100 | grep '"op":"c"'

Ожидаемый INSERT event:

{
  "payload": {
    "before": null,
    "after": {
      "id": 1005,
      "first_name": "John",
      "last_name": "Doe",
      "email": "[email protected]"
    },
    "source": {
      "snapshot": "false",
      "db": "inventory",
      "table": "customers",
      "server_id": 1,
      "gtid": "3e11fa47-71ca-11e1-9e33-c80aa9429562:150",
      "file": "mysql-bin.000003",
      "pos": 2548
    },
    "op": "c",
    "ts_ms": 1738406450000
  }
}

Ключевые отличия от snapshot event:

  • "op": "c" - CREATE operation (INSERT)
  • "snapshot": "false" - real-time binlog streaming
  • "gtid": "..." - GTID position (streaming phase использует GTID)
  • "server_id": 1 - MySQL server id (из docker-compose.yml: —server-id=1)

Test 3: Update Row

docker compose exec mysql mysql -u root -pmysql inventory -e "
UPDATE customers SET email = '[email protected]' WHERE id = 1005;
"

Consume event:

docker compose exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic mysql-server.inventory.customers \
  --from-beginning \
  --max-messages 100 | grep '"op":"u"'

UPDATE event structure:

{
  "payload": {
    "before": {
      "id": 1005,
      "first_name": "John",
      "last_name": "Doe",
      "email": "[email protected]"
    },
    "after": {
      "id": 1005,
      "first_name": "John",
      "last_name": "Doe",
      "email": "[email protected]"
    },
    "source": {
      "gtid": "3e11fa47-71ca-11e1-9e33-c80aa9429562:151",
      "file": "mysql-bin.000003",
      "pos": 2890
    },
    "op": "u"
  }
}

Обратите внимание:

  • "before" содержит старое значение email
  • "after" содержит новое значение email
  • Consumer может вычислить, какие колонки изменились

Test 4: Delete Row

docker compose exec mysql mysql -u root -pmysql inventory -e "
DELETE FROM customers WHERE id = 1005;
"

DELETE event structure:

{
  "payload": {
    "before": {
      "id": 1005,
      "first_name": "John",
      "last_name": "Doe",
      "email": "[email protected]"
    },
    "after": null,
    "source": {
      "gtid": "3e11fa47-71ca-11e1-9e33-c80aa9429562:152"
    },
    "op": "d"
  }
}

Ключевой момент:

  • "after": null - запись удалена
  • "before" содержит last known state (это возможно благодаря binlog-row-image=FULL из docker-compose.yml)
Tip

Если бы binlog-row-image=MINIMAL, DELETE event содержал бы только:

  • Primary key в before
  • Остальные колонки = null

С FULL (наша конфигурация): before содержит все колонки, что позволяет consumer знать полное состояние удаленной записи.

Проверка знаний
Чем отличаются поля op и snapshot в CDC событиях при initial snapshot и при real-time streaming?
Ответ
Во время snapshot события имеют op="r" (read) и snapshot="true", а GTID равен null. При real-time streaming INSERT имеет op="c" (create), UPDATE — op="u", DELETE — op="d", snapshot="false", и каждое событие содержит GTID позицию.

Troubleshooting Common Deployment Errors

Даже при корректной конфигурации могут возникнуть ошибки. Вот самые частые проблемы и их решения.

Error 1: “Server ID already in use”

Полная ошибка:

com.github.shyiko.mysql.binlog.network.ServerException:
Slave has the same server UUID as this server

Причина: database.server.id в connector config совпадает с MySQL server id или другим replica.

Диагностика:

-- Проверить server_id MySQL
docker compose exec mysql mysql -u root -pmysql -e "SHOW VARIABLES LIKE 'server_id'"
-- Результат: 1 (из docker-compose.yml)

-- Проверить server_id в connector config
curl -s http://localhost:8083/connectors/mysql-inventory-connector | jq '.config."database.server.id"'
-- Результат: должно быть != 1

Решение:

  1. Выбрать уникальный server.id (например, 184054)
  2. Обновить connector config:
curl -X PUT http://localhost:8083/connectors/mysql-inventory-connector/config \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    ...
    "database.server.id": "184054",
    ...
  }'
  1. Перезапустить connector:
curl -X POST http://localhost:8083/connectors/mysql-inventory-connector/restart

Error 2: “binlog_format is not ROW”

Полная ошибка:

The MySQL server is not configured to use a ROW binlog_format,
which is required for this connector to work properly.
Change the MySQL configuration to use a binlog_format=ROW and restart the connector.

Причина: MySQL не настроен на ROW binlog format.

Диагностика:

SHOW VARIABLES LIKE 'binlog_format';

Если результат: STATEMENT или MIXED - проблема подтверждена.

Решение:

В docker-compose.yml добавить (или проверить наличие):

mysql:
  command:
    - "--binlog-format=ROW"

Перезапустить MySQL:

docker compose restart mysql

Проверить:

SHOW VARIABLES LIKE 'binlog_format';
-- Должно быть: ROW

Перезапустить connector (см. Error 1, step 3).

Error 3: “Database history topic is missing”

Полная ошибка:

The db history topic or its content is fully or partially missing.
Please check database history topic configuration and re-execute the snapshot.

Причина: Schema history topic не создан или был purged.

Диагностика:

# Проверить, существует ли topic
docker compose exec kafka kafka-topics \
  --bootstrap-server localhost:9092 \
  --list | grep schema-changes

# Проверить конфигурацию retention
docker compose exec kafka kafka-topics \
  --bootstrap-server localhost:9092 \
  --describe \
  --topic schema-changes.mysql-server

Если topic отсутствует или retention.ms != -1 - проблема подтверждена.

Решение:

  1. Создать topic с infinite retention (см. раздел “Schema History Topic” выше)
  2. Удалить connector offset (force resnapshot):
# Stop connector
curl -X PUT http://localhost:8083/connectors/mysql-inventory-connector/pause

# Delete connector (removes offset)
curl -X DELETE http://localhost:8083/connectors/mysql-inventory-connector

# Recreate connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @mysql-connector-config.json

Connector выполнит full resnapshot.

Error 4: “Cannot replicate because master purged required binary logs”

Полная ошибка:

Got fatal error 1236 from master when reading data from binary log:
'Cannot replicate because the master purged required binary logs'

Причина: Debezium offset указывает на binlog файл, который был purged (удален через retention).

Диагностика:

-- Проверить, какие binlog файлы доступны
SHOW BINARY LOGS;

-- Проверить purged GTIDs
SHOW GLOBAL VARIABLES LIKE 'gtid_purged';

Если Debezium offset GTID находится в gtid_purged range - восстановление невозможно без resnapshot.

Решение:

Option A: Resnapshot (безопасно, но медленно):

# Изменить snapshot.mode на initial
curl -X PUT http://localhost:8083/connectors/mysql-inventory-connector/config \
  -H "Content-Type: application/json" \
  -d '{
    ...
    "snapshot.mode": "initial",
    ...
  }'

# Delete connector offset and recreate (как в Error 3)

Option B: Prevention (избежать в будущем):

  1. Увеличить binlog-expire-logs-seconds в MySQL (например, с 7 до 14 дней)
  2. Настроить heartbeat (если еще не настроен)
  3. Мониторить Debezium lag (см. Lesson 3, раздел “Мониторинг Binlog Health”)

Error 5: “Access denied for user ‘debezium’”

Полная ошибка:

Access denied for user 'debezium'@'%' to database 'inventory'

Причина: User debezium не имеет необходимых прав.

Диагностика:

SHOW GRANTS FOR 'debezium'@'%';

Проверить наличие:

  • REPLICATION CLIENT
  • REPLICATION SLAVE
  • SELECT на нужных databases

Решение:

Выдать недостающие права:

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'debezium'@'%';

-- Если используется heartbeat
GRANT INSERT, UPDATE ON inventory.debezium_heartbeat TO 'debezium'@'%';

FLUSH PRIVILEGES;

Перезапустить connector.

Debugging Techniques

Для сложных проблем:

1. Проверить connector logs:

docker compose logs debezium-connect | tail -100

Искать ERROR, WARN, Exception.

2. Проверить Kafka Connect worker status:

curl -s http://localhost:8083/ | jq .

Должно вернуть версию и cluster ID.

3. Проверить все connectors:

curl -s http://localhost:8083/connectors | jq .

Убедиться, что mysql-inventory-connector в списке.

4. Проверить connector config:

curl -s http://localhost:8083/connectors/mysql-inventory-connector | jq '.config'

Сравнить с ожидаемой конфигурацией.

5. Check offset storage:

docker compose exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic connect-offsets \
  --from-beginning \
  --property print.key=true \
  | grep mysql-inventory-connector

Найти сохраненный GTID position.

Key Takeaways

  • MySQL connector отличается от PostgreSQL: Нет replication slots, client-side position tracking, schema history topic обязателен
  • database.server.id must be unique в MySQL cluster topology (не совпадать с server.id MySQL primary/replica)
  • Schema history topic критичен - создавать до deployment с retention.ms=-1 и retention.bytes=-1
  • Обязательно single partition для schema history topic (сохранение порядка DDL statements)
  • database.server.name используется как prefix для Kafka topics - нельзя менять после deployment
  • snapshot.mode=initial выполняет full snapshot при первом запуске (может занять часы для больших баз)
  • Heartbeat (из Lesson 3) предотвращает position loss на idle таблицах - обязателен для production
  • REST API deployment паттерн: POST config → Check status → Verify topics → Test CDC events
  • CDC event structure: before (old state), after (new state), op (c/u/d/r), source (GTID, file, pos)
  • binlog-row-image=FULL позволяет DELETE events содержать все колонки в before state
  • Common pitfalls: Server ID conflicts, missing schema history topic, binlog purge before reading, insufficient permissions
  • Troubleshooting approach: Check connector status → Read logs → Verify MySQL config → Check offset storage
  • Deprecated property: database.history.* → используйте schema.history.internal.* для Debezium 2.x

Что дальше?

Вы освоили базовую настройку MySQL CDC connector. Следующий шаг - понять архитектурные отличия между MySQL binlog и PostgreSQL WAL подходами.

Следующий урок: Binlog vs WAL: Архитектурное сравнение

Мы создадим side-by-side comparison:

  • Position tracking: GTID (MySQL) vs LSN (PostgreSQL)
  • Schema evolution: Schema history topic (MySQL) vs Replication slot + Publication (PostgreSQL)
  • Snapshot mechanisms: MySQL locking vs PostgreSQL export snapshot
  • Failover scenarios: GTID automatic failover vs manual LSN adjustment

Понимание этих различий критично для дата-инженеров, которые работают с multi-database CDC pipelines (MySQL + PostgreSQL в одной инфраструктуре).

Check Your Understanding

Score: 0 of 0
Applied
Question 1 of 8. Инженер подготовил конфигурацию MySQL Debezium connector для production: { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql-primary.prod", "database.port": "3306", "database.user": "debezium", "database.password": "dbz_prod_secret", "database.server.name": "orders-db", "table.include.list": "shop.orders,shop.customers", "snapshot.mode": "initial" } Каких двух критических свойств не хватает для production deployment?

Finished the lesson?

Mark it as complete to track your progress