Требуемые знания:
- module-8/05-binlog-wal-comparison
Schema History Topic: Критический Компонент Recovery
Если вы прошли Модуль 2, вы знаете, что PostgreSQL connector не требует дополнительной конфигурации для хранения схем — все метаданные о структуре таблиц встроены в WAL события. MySQL работает иначе.
Schema history topic — это специальный Kafka топик, где Debezium MySQL connector хранит всю историю DDL-изменений (CREATE TABLE, ALTER TABLE) с привязкой к binlog позициям. Это память коннектора, без которой он не может восстановиться после перезапуска.
В этом уроке мы погрузимся в архитектуру schema history, разберём критические требования к retention, изучим сценарии восстановления при потере данных и научимся мониторить здоровье этого топика.
PostgreSQL vs MySQL: PostgreSQL не нуждается в schema history topic, потому что logical decoding события содержат полную информацию о структуре таблиц. MySQL binlog содержит только TABLE_MAP события с table_id — коннектор должен самостоятельно восстанавливать схему.
Почему MySQL нуждается в Schema History?
Разберём фундаментальное различие между PostgreSQL и MySQL CDC.
PostgreSQL: Схема в каждом событии
// PostgreSQL WAL событие (через pgoutput)
{
"schema": {
"type": "struct",
"fields": [
{"field": "id", "type": "int64"},
{"field": "name", "type": "string"},
{"field": "email", "type": "string"}
]
},
"payload": {
"before": null,
"after": {"id": 1, "name": "Alice", "email": "[email protected]"}
}
}
Каждое событие содержит полную информацию о схеме. Даже если connector перезапустится, он может декодировать события без дополнительного контекста.
MySQL: TABLE_MAP только с table_id
// MySQL binlog событие (ROW формат)
TABLE_MAP_EVENT:
table_id: 108
database: inventory
table: customers
(схема НЕ включена)
WRITE_ROWS_EVENT:
table_id: 108
row_data: [1, "Alice", "[email protected]"]
Проблема: WRITE_ROWS_EVENT содержит только сырые байты данных и table_id. Чтобы интерпретировать эти байты, коннектор должен знать:
- Сколько колонок в таблице?
- Какие типы данных?
- Какие имена колонок?
Откуда коннектор получает эту информацию? Из schema history topic.
Единственное решение — полный resnapshot базы данных.
Вывод: Schema history topic — это память коннектора о DDL-изменениях. Без него MySQL connector не может восстановиться после перезапуска.
Как работает Schema History?
Разберём жизненный цикл schema history от initial snapshot до recovery.
Фаза 1: Initial Snapshot
При первом запуске коннектор делает полный снимок таблиц и записывает их DDL в schema history topic.
-- Текущая схема базы данных
CREATE TABLE customers (
id INT PRIMARY KEY,
name VARCHAR(255),
email VARCHAR(255)
);
CREATE TABLE orders (
id INT PRIMARY KEY,
customer_id INT,
created_at DATETIME
);
Что записывается в schema history topic:
// Сообщение 1: Database structure
{
"position": {"file": "mysql-bin.000001", "pos": 154},
"databaseName": "inventory",
"ddl": "CREATE DATABASE IF NOT EXISTS `inventory`",
"timestamp": 1706745600000
}
// Сообщение 2: Table structure
{
"position": {"file": "mysql-bin.000001", "pos": 234},
"databaseName": "inventory",
"ddl": "CREATE TABLE `customers` (
`id` int NOT NULL,
`name` varchar(255),
`email` varchar(255),
PRIMARY KEY (`id`)
)",
"timestamp": 1706745601000
}
// Сообщение 3: Another table
{
"position": {"file": "mysql-bin.000001", "pos": 456},
"databaseName": "inventory",
"ddl": "CREATE TABLE `orders` (...)",
"timestamp": 1706745602000
}
Фаза 2: Streaming Changes (DDL записывается автоматически)
Когда приложение выполняет DDL операцию, Debezium автоматически записывает её в schema history.
-- DBA выполняет миграцию
ALTER TABLE customers ADD COLUMN phone VARCHAR(20);
Debezium детектирует DDL из binlog:
QUERY_EVENT:
database: inventory
query: ALTER TABLE customers ADD COLUMN phone VARCHAR(20)
binlog_position: (mysql-bin.000003, 1024)
Запись в schema history topic:
{
"position": {"file": "mysql-bin.000003", "pos": 1024},
"databaseName": "inventory",
"ddl": "ALTER TABLE `customers` ADD COLUMN `phone` varchar(20)",
"timestamp": 1706755600000
}
Фаза 3: Connector Restart (Replay DDL History)
При перезапуске коннектор должен восстановить схему всех таблиц на момент сохранённого offset.
Ключевой момент: Коннектор читает schema history topic от начала до конца, применяя DDL последовательно до сохранённого binlog offset. Это даёт ему точное состояние схемы на момент остановки.
Schema History Topic Configuration
Теперь разберём, как правильно настроить schema history topic для production.
Обязательные параметры
{
"name": "mysql-inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// Обязательно: имя топика для schema history
"schema.history.internal.kafka.topic": "schema-changes.mysql-inventory",
// Обязательно: Kafka bootstrap servers
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
// Остальная конфигурация...
"database.hostname": "mysql",
"database.port": "3306",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"table.include.list": "inventory.*"
}
}
Важно:
- Используйте уникальное имя топика для каждого коннектора
- Если коннекторов несколько, НЕ используйте общий schema history topic
Опциональные параметры (recovery tuning)
{
"config": {
// ... обязательные параметры ...
// Попытки восстановления при старте (default: 4)
"schema.history.internal.kafka.recovery.attempts": "10",
// Интервал между попытками (default: 500ms)
"schema.history.internal.kafka.recovery.poll.interval.ms": "1000",
// Timeout для чтения истории (default: 3000ms)
"schema.history.internal.consumer.sasl.login.refresh.timeout.ms": "10000"
}
}
Когда нужны эти параметры:
- Если Kafka cluster временно недоступен при старте коннектора
- Если schema history topic большой и требует времени на чтение
- Для более надёжного recovery в нестабильной сети
Naming convention: Используйте префикс schema-changes.{connector-name} для ясности. Примеры: schema-changes.mysql-inventory, schema-changes.aurora-production.
КРИТИЧНО: Retention Configuration
Это самая частая причина катастрофических сбоев Debezium MySQL connector в production.
7-дневная бомба замедленного действия
По умолчанию Confluent Kafka использует log.retention.ms=604800000 (7 дней). Если вы не изменили retention для schema history topic, через 7 дней старые DDL записи будут удалены.
Что произойдёт:
- Коннектор работает нормально первые 7 дней
- На 8-й день retention policy удаляет старые сообщения
- Коннектор перезапускается (deploy, failover, restart)
- ОШИБКА: “The db history topic or its content is fully or partially missing”
- Коннектор НЕ может восстановиться — требуется полный resnapshot
Правильная конфигурация: Infinite Retention
Schema history topic ВСЕГДА должен иметь бесконечный retention.
# Создайте schema history topic ДО деплоя коннектора
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-changes.mysql-inventory \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=-1 \
--config retention.bytes=-1
# Проверьте настройки
kafka-topics --bootstrap-server kafka:9092 \
--describe \
--topic schema-changes.mysql-inventory
Ожидаемый вывод:
Topic: schema-changes.mysql-inventory
PartitionCount: 1
ReplicationFactor: 3
Configs: retention.ms=-1,retention.bytes=-1
Если топик уже существует с неправильным retention
# Изменить retention на существующем топике
kafka-configs --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name schema-changes.mysql-inventory \
--alter \
--add-config retention.ms=-1,retention.bytes=-1
# Проверить изменения
kafka-configs --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name schema-changes.mysql-inventory \
--describe
Если retention уже удалил сообщения, изменение конфигурации НЕ вернёт их обратно. Потребуется resnapshot.
Проверка знанийКак коннектор Debezium использует schema history topic при перезапуске?
Topic Requirements (Anti-Patterns)
Schema history topic имеет строгие требования к конфигурации.
| Требование | Значение | Почему | Что будет, если нарушить |
|---|---|---|---|
| Partitions | 1 (ОДНА партиция) | DDL события должны читаться в строгом порядке | Схема будет применяться в неправильном порядке → corruption |
| Retention time | -1 (бесконечный) | Коннектор читает ВСЮ историю от начала | Recovery fails после 7 дней (default retention) |
| Retention bytes | -1 (бесконечный) | Топик может вырасти до нескольких MB | Topic purge → recovery fails |
| Cleanup policy | delete (НЕ compact) | Нужна полная история, не последнее состояние | Compaction удалит промежуточные DDL → schema mismatch |
| Replication factor | ≥ 3 (production) | Отказоустойчивость топика | Потеря broker → потеря истории → resnapshot |
Команда для создания с правильными настройками:
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-changes.mysql-inventory \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=-1 \
--config retention.bytes=-1 \
--config cleanup.policy=delete \
--config min.insync.replicas=2
Anti-pattern: Shared Schema History Topic
Никогда не используйте один schema history topic для нескольких коннекторов. Каждый коннектор должен иметь свой уникальный топик.
Почему: Коннектор A прочитает DDL от коннектора B → schema mismatch → corrupt events.
Что хранится в Schema History Topic?
Разберём внутренний формат записей для понимания механизма recovery.
Формат сообщения
Каждая DDL операция записывается как JSON сообщение (не Avro).
{
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "mysql-server",
"ts_ms": 1706745600000,
"snapshot": "false",
"db": "inventory",
"table": "customers"
},
"position": {
"transaction_id": null,
"ts_sec": 1706745600,
"file": "mysql-bin.000003",
"pos": 1024,
"gtids": "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-150",
"server_id": 1
},
"databaseName": "inventory",
"schemaName": null,
"ddl": "ALTER TABLE `customers` ADD COLUMN `phone` varchar(20) AFTER `email`",
"tableChanges": [
{
"type": "ALTER",
"id": "\"inventory\".\"customers\"",
"table": {
"defaultCharsetName": "utf8mb4",
"primaryKeyColumnNames": ["id"],
"columns": [
{"name": "id", "jdbcType": 4, "typeName": "INT"},
{"name": "name", "jdbcType": 12, "typeName": "VARCHAR", "length": 255},
{"name": "email", "jdbcType": 12, "typeName": "VARCHAR", "length": 255},
{"name": "phone", "jdbcType": 12, "typeName": "VARCHAR", "length": 20}
]
}
}
]
}
Ключевые поля:
-
position: Binlog позиция, на которой произошло DDLfile+pos— для file:offset trackinggtids— для GTID tracking
-
ddl: Полный SQL текст DDL операции- Используется для человека (логирование, debugging)
-
tableChanges: Структурированное описание схемы- Используется Debezium для декодирования row events
- Содержит полные метаданные колонок (типы, длина, constraints)
Пример последовательности событий
# Читаем schema history topic
kafka-console-consumer --bootstrap-server kafka:9092 \
--topic schema-changes.mysql-inventory \
--from-beginning \
--property print.key=true
Вывод (упрощённо):
null {"ddl":"CREATE DATABASE `inventory`","position":{"file":"mysql-bin.000001","pos":154}}
null {"ddl":"CREATE TABLE `customers` (id INT, name VARCHAR(255))","position":{"file":"mysql-bin.000001","pos":234}}
null {"ddl":"CREATE TABLE `orders` (...)","position":{"file":"mysql-bin.000001","pos":456}}
null {"ddl":"ALTER TABLE `customers` ADD COLUMN `email` VARCHAR(255)","position":{"file":"mysql-bin.000002","pos":789}}
null {"ddl":"ALTER TABLE `customers` ADD COLUMN `phone` VARCHAR(20)","position":{"file":"mysql-bin.000003","pos":1024}}
При restart коннектор:
- Читает все 5 сообщений от начала
- Применяет DDL последовательно
- Получает текущую схему:
customers (id, name, email, phone) - Возобновляет чтение binlog с сохранённого offset
Recovery Scenarios
Разберём типичные сценарии сбоев и процедуры восстановления.
Scenario A: Normal Connector Restart ✅
Что происходит:
- Connector останавливается (deploy, reschedule, config update)
- Connector стартует заново
Процесс recovery:
Действия администратора: Никаких. Recovery автоматический.
Время recovery: Секунды (зависит от размера schema history topic)
Scenario B: Schema History Topic Partially Purged ⚠️
Симптом:
ERROR: The db history topic 'schema-changes.mysql-inventory' is fully or partially missing.
Причина:
- Default Kafka retention (7 дней) удалил старые DDL записи
- Connector не может восстановить схему с initial snapshot
Диагностика:
# Проверить retention settings
kafka-configs --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name schema-changes.mysql-inventory \
--describe
# Если retention.ms != -1, это причина
Recovery Option 1: Resnapshot (МЕДЛЕННО)
Resnapshot может занять часы для больших таблиц
Для базы данных с таблицей на 100GB, initial snapshot может выполняться 2-4 часа. Учитывайте downtime.
# 1. Удалить коннектор
curl -X DELETE http://localhost:8083/connectors/mysql-inventory-connector
# 2. Удалить schema history topic
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--topic schema-changes.mysql-inventory
# 3. Создать топик с правильным retention
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-changes.mysql-inventory \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=-1 \
--config retention.bytes=-1
# 4. Удалить Kafka Connect offsets (опционально)
# Это заставит коннектор начать с snapshot.mode=initial
kafka-console-consumer --bootstrap-server kafka:9092 \
--topic connect-offsets \
--from-beginning \
--property print.key=true | grep mysql-inventory-connector
# (manually delete offset records via Kafka Connect REST API)
# 5. Пересоздать коннектор с snapshot.mode=initial
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-inventory-connector",
"config": {
"snapshot.mode": "initial",
...
}
}'
Recovery Option 2: Restore from Backup (БЫСТРО)
Если у вас есть backup schema history topic (см. секцию “Backup Procedures”):
# 1. Удалить повреждённый топик
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--topic schema-changes.mysql-inventory
# 2. Создать новый топик
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-changes.mysql-inventory \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=-1
# 3. Восстановить из backup
kafka-console-producer --bootstrap-server kafka:9092 \
--topic schema-changes.mysql-inventory \
< schema-history-backup-2026-01-31.json
# 4. Перезапустить коннектор
curl -X POST http://localhost:8083/connectors/mysql-inventory-connector/restart
Время recovery: 1-5 минут (vs часы для resnapshot)
Scenario C: Schema History Topic Corrupted 💥
Симптом:
- Events содержат неправильные колонки
- Schema parsing errors в логах Debezium
- Events с данными, не соответствующими текущей схеме
ERROR: Column count doesn't match value count at row 1
ERROR: Unknown column 'old_column_name' in field list
Причина:
- Ручное редактирование schema history topic
- Сетевые сбои во время записи DDL
- Kafka broker corruption
Диагностика:
# Прочитать schema history topic
kafka-console-consumer --bootstrap-server kafka:9092 \
--topic schema-changes.mysql-inventory \
--from-beginning
# Проверить на:
# - Неполные JSON сообщения
# - Сообщения с неправильными binlog positions
# - Дублирующиеся DDL операции
Recovery (ТОЛЬКО resnapshot):
# 1. Остановить коннектор
curl -X DELETE http://localhost:8083/connectors/mysql-inventory-connector
# 2. Удалить повреждённый schema history topic
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--topic schema-changes.mysql-inventory
# 3. Создать новый топик с правильной конфигурацией
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-changes.mysql-inventory \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=-1 \
--config retention.bytes=-1
# 4. Пересоздать коннектор с snapshot.mode=initial
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @mysql-connector-config.json
Scenario D: Multiple Connectors Sharing Topic 🚫
Anti-pattern: Shared schema history topic
Это критическая ошибка конфигурации, которая приводит к непредсказуемым сбоям.
Симптом:
- Random schema errors (временные, непостоянные)
- Connector A видит DDL от Connector B
- Events с колонками, которые не существуют в таблице
ERROR: Column 'customer_id' doesn't exist in table 'products'
(Connector A прочитал DDL от Connector B, который работает с другой базой)
Причина:
- Два коннектора используют одинаковое имя schema history topic
- Или используется
topic.prefixбез уникального schema history topic
Неправильная конфигурация:
// Connector A
{
"name": "mysql-connector-a",
"config": {
"schema.history.internal.kafka.topic": "schema-changes.shared" // ❌ WRONG
}
}
// Connector B
{
"name": "mysql-connector-b",
"config": {
"schema.history.internal.kafka.topic": "schema-changes.shared" // ❌ WRONG
}
}
Правильная конфигурация:
// Connector A
{
"name": "mysql-connector-a",
"config": {
"schema.history.internal.kafka.topic": "schema-changes.mysql-connector-a" // ✅ UNIQUE
}
}
// Connector B
{
"name": "mysql-connector-b",
"config": {
"schema.history.internal.kafka.topic": "schema-changes.mysql-connector-b" // ✅ UNIQUE
}
}
Recovery:
# 1. Остановить оба коннектора
curl -X DELETE http://localhost:8083/connectors/mysql-connector-a
curl -X DELETE http://localhost:8083/connectors/mysql-connector-b
# 2. Удалить shared schema history topic
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--topic schema-changes.shared
# 3. Создать уникальные топики для каждого коннектора
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-changes.mysql-connector-a \
--partitions 1 --replication-factor 3 --config retention.ms=-1
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-changes.mysql-connector-b \
--partitions 1 --replication-factor 3 --config retention.ms=-1
# 4. Пересоздать коннекторы с правильными настройками и snapshot.mode=initial
curl -X POST http://localhost:8083/connectors -d @connector-a-config.json
curl -X POST http://localhost:8083/connectors -d @connector-b-config.json
Monitoring Schema History Health
Проактивный мониторинг schema history topic предотвращает катастрофы.
Метрики для отслеживания
| Метрика | Что мониторить | Нормальное поведение | Alert threshold |
|---|---|---|---|
| Topic size | Размер топика в MB | Растёт со временем (DDL добавляются) | Внезапное падение до 0 |
| Message count | Количество DDL сообщений | Медленный рост (редкие DDL) | Внезапное уменьшение |
| Consumer lag | Коннектор читает топик | 0 (коннектор читает при старте) | Lag > 0 при старте |
| Retention config | retention.ms | -1 (infinite) | Любое значение ≠ -1 |
Kafka Commands для мониторинга
# 1. Проверить retention settings
kafka-configs --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name schema-changes.mysql-inventory \
--describe | grep retention
# Ожидается: retention.ms=-1, retention.bytes=-1
# 2. Проверить размер топика
kafka-log-dirs --bootstrap-server kafka:9092 \
--topic-list schema-changes.mysql-inventory \
--describe | grep size
# 3. Подсчитать сообщения в топике
kafka-run-class kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 \
--topic schema-changes.mysql-inventory | \
awk -F: '{sum += $3} END {print "Total messages:", sum}'
# 4. Проверить, что топик не compacted
kafka-topics --bootstrap-server kafka:9092 \
--describe \
--topic schema-changes.mysql-inventory | grep cleanup.policy
# Ожидается: cleanup.policy=delete (NOT compact)
Alerts для настройки
# Prometheus AlertManager rule (пример)
groups:
- name: debezium_schema_history
rules:
- alert: SchemaHistoryRetentionIncorrect
expr: |
kafka_topic_config{topic=~"schema-changes.*",key="retention.ms"} != -1
for: 5m
annotations:
summary: "Schema history topic has finite retention (CRITICAL)"
- alert: SchemaHistoryTopicSizeDecreased
expr: |
delta(kafka_log_size{topic=~"schema-changes.*"}[1h]) < 0
for: 5m
annotations:
summary: "Schema history topic size decreased (possible purge)"
- alert: SchemaHistoryPartitionCountWrong
expr: |
kafka_topic_partitions{topic=~"schema-changes.*"} != 1
for: 5m
annotations:
summary: "Schema history topic has multiple partitions (MISCONFIGURED)"
Proactive monitoring saves hours of downtime
Настройте alert на retention.ms != -1 в первый же день. Это предотвратит 7-дневную бомбу.
Backup and Restore Procedures
Backup schema history topic значительно ускоряет recovery.
Почему backup критичен?
Сценарий без backup:
- Schema history topic purged или corrupted
- Единственный вариант: full resnapshot
- Для 100GB таблицы: 2-4 часа downtime
Сценарий с backup:
- Restore schema history topic из backup
- Restart коннектора
- Recovery за 1-5 минут
Как создать backup
# Экспорт schema history topic в файл
kafka-console-consumer --bootstrap-server kafka:9092 \
--topic schema-changes.mysql-inventory \
--from-beginning \
--property print.key=true \
--property print.timestamp=true \
--timeout-ms 10000 \
> schema-history-backup-$(date +%Y-%m-%d).json
# Пример вывода файла:
# CreateTime:1706745600000 null {"ddl":"CREATE TABLE ...","position":{...}}
# CreateTime:1706745601000 null {"ddl":"ALTER TABLE ...","position":{...}}
Рекомендация по частоте backup:
- Daily backup для production коннекторов
- После каждого DDL изменения (для критических систем)
- Retention: Храните backups минимум 30 дней
Автоматизация с cron:
#!/bin/bash
# /usr/local/bin/backup-schema-history.sh
KAFKA_BROKER="kafka:9092"
TOPIC="schema-changes.mysql-inventory"
BACKUP_DIR="/backups/debezium-schema-history"
DATE=$(date +%Y-%m-%d)
mkdir -p "$BACKUP_DIR"
kafka-console-consumer --bootstrap-server "$KAFKA_BROKER" \
--topic "$TOPIC" \
--from-beginning \
--property print.key=true \
--timeout-ms 10000 \
> "$BACKUP_DIR/schema-history-$DATE.json"
# Compress старых backups (опционально)
gzip "$BACKUP_DIR/schema-history-$DATE.json"
# Удалить backups старше 30 дней
find "$BACKUP_DIR" -name "schema-history-*.json.gz" -mtime +30 -delete
Cron schedule (ежедневно в 2 AM):
0 2 * * * /usr/local/bin/backup-schema-history.sh
Как восстановить из backup
# 1. Создать новый schema history topic (если удалён)
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-changes.mysql-inventory \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=-1 \
--config retention.bytes=-1
# 2. Распаковать backup (если gzipped)
gunzip schema-history-backup-2026-01-31.json.gz
# 3. Восстановить сообщения в топик
# ВАЖНО: Удалите timestamps и keys из backup файла перед restore
cat schema-history-backup-2026-01-31.json | \
awk -F'\t' '{print $3}' | \
kafka-console-producer --bootstrap-server kafka:9092 \
--topic schema-changes.mysql-inventory
# 4. Проверить, что сообщения загружены
kafka-run-class kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 \
--topic schema-changes.mysql-inventory
# 5. Перезапустить коннектор
curl -X POST http://localhost:8083/connectors/mysql-inventory-connector/restart
Restore must use exact same topic name
Schema history topic name hardcoded в коннекторе. Если вы restore в топик с другим именем, коннектор не найдёт историю.
Проверка знанийПочему нельзя использовать один schema history topic для нескольких Debezium коннекторов?
Schema Evolution Best Practices
Как безопасно выполнять DDL изменения в production CDC пайплайне.
Compatible Schema Changes (Seamless)
Эти DDL операции НЕ ломают CDC пайплайн:
-- ✅ ADD COLUMN (nullable или с default)
ALTER TABLE customers ADD COLUMN phone VARCHAR(20);
ALTER TABLE customers ADD COLUMN created_at DATETIME DEFAULT CURRENT_TIMESTAMP;
-- ✅ ADD INDEX (не влияет на CDC)
CREATE INDEX idx_email ON customers(email);
-- ✅ RENAME COLUMN (Debezium видит новое имя)
ALTER TABLE customers RENAME COLUMN name TO full_name;
Что происходит:
- MySQL выполняет DDL
- Debezium записывает DDL в schema history topic
- Последующие события используют новую схему
- Consumers получают events с новыми колонками
No downtime, no coordination needed.
Incompatible Schema Changes (Requires Coordination)
Эти DDL операции могут сломать consumers:
-- ⚠️ DROP COLUMN
ALTER TABLE customers DROP COLUMN email;
-- ⚠️ CHANGE COLUMN TYPE
ALTER TABLE customers MODIFY COLUMN id BIGINT;
-- ⚠️ RENAME TABLE
RENAME TABLE customers TO users;
Проблема: Consumers ожидают старую схему.
Best practice workflow:
1. Deploy new consumer version (supports both old and new schema)
2. Wait for all consumers to upgrade
3. Execute DDL in MySQL
4. Debezium automatically propagates schema change
5. Old consumer version gracefully handles missing column (if coded defensively)
6. After migration period, deploy final consumer version (only new schema)
Test schema changes in staging first
Всегда тестируйте DDL миграции в staging environment с полным CDC пайплайном. Это выявит проблемы совместимости до production deploy.
Handling Large DDL Operations
-- Операция, которая может занять минуты/часы
ALTER TABLE large_table ADD COLUMN metadata JSON;
Проблемы:
- MySQL блокирует таблицу на время DDL (зависит от версии и типа операции)
- Debezium может видеть timeout если операция слишком долгая
- Binlog position не двигается во время DDL
Рекомендации:
-
Используйте Online DDL (MySQL 8.0+)
ALTER TABLE large_table ADD COLUMN metadata JSON, ALGORITHM=INPLACE, LOCK=NONE; -
Для старых MySQL версий: pt-online-schema-change
pt-online-schema-change --alter "ADD COLUMN metadata JSON" \ D=inventory,t=large_table \ --execute -
Monitor Debezium during DDL
- Watch connector status:
curl http://localhost:8083/connectors/mysql-inventory-connector/status - Check for timeout errors in logs
- Watch connector status:
Ключевые выводы
- Schema history topic — это память MySQL CDC коннектора — без него невозможен recovery после restart
- PostgreSQL не нуждается в schema history — WAL события self-contained, MySQL binlog требует TABLE_MAP reconstruction
- Retention MUST be infinite —
retention.ms=-1иretention.bytes=-1обязательны, иначе 7-дневная бомба - Single partition required — DDL события должны применяться в строгом порядке
- Unique topic per connector — sharing schema history topic между коннекторами ведёт к corruption
- Backup schema history topic — restore за минуты vs resnapshot за часы
- Monitor retention config — alert на
retention.ms != -1предотвращает катастрофы - Compatible DDL seamless — ADD COLUMN, ADD INDEX автоматически propagate
- Incompatible DDL requires coordination — DROP COLUMN, RENAME TABLE нужна синхронизация с consumers
- Test DDL changes in staging — полный CDC пайплайн должен быть протестирован до production
- Never manually edit schema history topic — corruption ведёт к mandatory resnapshot
- Schema history NOT compacted —
cleanup.policy=delete, нужна полная история
Что дальше?
Мы разобрали критическую роль schema history topic для MySQL CDC recovery. Теперь вы знаете, как правильно настроить retention, восстановиться из backup и мониторить здоровье топика.
В следующих уроках Модуля 8 мы перейдём к AWS Aurora MySQL — managed service с собственными особенностями CDC:
- Параметр-группы Aurora для binlog конфигурации
- Ограничения привилегий в RDS/Aurora
- Read replica поддержка для CDC
- Aurora-specific monitoring и troubleshooting
Aurora MySQL использует те же принципы schema history, но имеет уникальные ограничения managed services. Prepare to dive deep.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс