Требуемые знания:
- module-8/06-schema-history-recovery
- module-8/03-binlog-retention-heartbeat
Recovery Procedures: Binlog Loss и Schema History Corruption
Production Reality: Сбои неизбежны
Вы настроили правильный retention. Вы настроили heartbeat. Вы мониторите lag. И всё равно — сбои происходят:
- Binlog purge из-за неожиданного downtime (Kafka cluster сбой, сетевой split, длительный deploy)
- Schema history topic corruption из-за Kafka retention policy, accidental deletion, broker failure
- Connector crash во время DDL операции с частичной записью в schema history
- Manual mistakes — случайный PURGE BINARY LOGS, изменение retention без анализа последствий
В этом уроке мы изучим как диагностировать и восстановиться от двух самых критичных сбоев MySQL CDC в production:
- Binlog Position Loss — connector требует binlog файл, который уже purged
- Schema History Topic Corruption — schema history topic полностью или частично отсутствует
Мы рассмотрим decision tree для диагностики, step-by-step recovery workflows, prevention strategies в формате defense-in-depth, и backup procedures для быстрого восстановления.
Prevention vs Recovery Trade-off
Prevention всегда дешевле recovery:
- Infinite retention на schema history: бесплатно (несколько MB disk space)
- Recovery от corrupted schema history: часы downtime + resnapshot
- Adequate binlog retention: копейки disk space
- Recovery от purged binlog: resnapshot 500GB таблицы = 6+ часов
Но даже с perfect prevention нужно знать recovery procedures для edge cases.
Recovery Decision Tree
Когда connector падает, первый шаг — диагностика типа сбоя по error message.
Status: FAILED
"file not available"
2. Check offset topic
3. Compare positions
when_needed
"partially missing"
Corruption
since last offset?
"column mismatch"
Schema History issue
| Error Pattern | Scenario | Recovery Path |
|---|---|---|
| binlog file not available | Binlog Position Loss | when_needed / initial |
| history topic missing | Schema History Corruption | recovery / backup / fresh |
| table not found in schema | Schema Mismatch | See Schema History |
Error Message Patterns для быстрой диагностики:
| Error Pattern | Scenario | Lesson Reference |
|---|---|---|
Connector requires binlog file 'mysql-bin.001234', but MySQL only has mysql-bin.001456 | Binlog Position Loss | См. Lesson 03 (Binlog Retention) |
The db history topic or its content is fully or partially missing | Schema History Corruption | См. Lesson 06 (Schema History) |
Table 'customers' not found in schema history | Schema Mismatch | Обычно следствие Scenario 2 |
Column count doesn't match value count at row 1 | Schema Mismatch | Обычно следствие Scenario 2 |
Scenario 1: Binlog Position Loss
Симптомы
Connector fails при старте или во время streaming с ошибкой:
ERROR: Error during binlog reading
io.debezium.DebeziumException: The connector is trying to read binlog starting at
binlog file 'mysql-bin.000134', pos=4567890, but this file no longer exists on the server.
Или через MySQL replication protocol:
ERROR 1236 (HY000): Could not find first log file name in binary log index file
Причины
- Binlog retention expired — connector был offline дольше
binlog_expire_logs_seconds - Manual PURGE BINARY LOGS — DBA вручную удалил старые binlog файлы для освобождения disk space
- MySQL restart с агрессивным retention — binlog_expire_logs_seconds изменён на меньшее значение
- Long-running snapshot — snapshot длится дольше binlog retention, position purged до завершения snapshot (см. Lesson 03, Pitfall 2)
Diagnosis Steps
Шаг 1: Проверить доступные binlog файлы
Подключитесь к MySQL:
docker compose exec mysql mysql -u root -p
Проверьте список binlog файлов:
SHOW BINARY LOGS;
Вывод покажет самый старый доступный файл:
+------------------+-----------+-----------+
| Log_name | File_size | Encrypted |
+------------------+-----------+-----------+
| mysql-bin.000255 | 1024 | No |
| mysql-bin.000256 | 2048 | No |
| mysql-bin.000257 | 4096 | No |
+------------------+-----------+-----------+
Самый старый файл: mysql-bin.000255
Шаг 2: Проверить required position в offset topic
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic connect-offsets \
--from-beginning \
--property print.key=true \
| grep mysql-connector
Найдите последний offset для вашего connector:
["mysql-connector",{"server":"mysql-server"}]
{
"file": "mysql-bin.000134",
"pos": 4567890,
"gtids": "3e11fa47-71ca-11e1-9e33-c80aa9429562:1-150"
}
Required file: mysql-bin.000134
Шаг 3: Сравнить positions
Required: mysql-bin.000134 (from offset)
Available: mysql-bin.000255 (oldest on MySQL)
000134 < 000255 → Position LOST
Connector не может продолжить — binlog файлы 000134-000254 уже purged.
Шаг 4: Оценить data gap
Если у вас GTID mode enabled, проверьте purged GTID set:
SHOW GLOBAL VARIABLES LIKE 'gtid_purged';
Вывод:
+---------------+------------------------------------------+
| Variable_name | Value |
+---------------+------------------------------------------+
| gtid_purged | 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-200 |
+---------------+------------------------------------------+
Сравните с offset GTID (1-150):
- Offset: до 150
- Purged: до 200
- Gap: GTID 151-200 потеряны
Проверка знанийКак диагностировать binlog position loss и определить размер потерянных данных?
Recovery Procedure
when_needed может создать дублирующие события
Во время automatic snapshot connector будет читать текущее состояние таблиц. Если между last offset и сейчас были INSERT операции, вы получите duplicate events для этих записей:
- Событие от streaming до position loss
- То же событие от snapshot после recovery
Downstream consumers должны быть idempotent или иметь deduplication logic.
Option A: Automatic Recovery с snapshot.mode=when_needed
Когда использовать: Production deployments, где нужен minimal manual intervention.
Как работает: Connector автоматически детектирует, что binlog position unavailable, и триггерит snapshot (см. Lesson 09 для snapshot mechanics).
Конфигурация:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"table.include.list": "inventory.*",
"snapshot.mode": "when_needed",
"schema.history.internal.kafka.topic": "schema-history.mysql-connector",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO inventory.debezium_heartbeat (id, ts) VALUES (1, NOW()) ON DUPLICATE KEY UPDATE ts = NOW()"
}
}
Применить конфигурацию:
curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
-H "Content-Type: application/json" \
-d @mysql-connector-config.json
Restart connector:
curl -X POST http://localhost:8083/connectors/mysql-connector/restart
Мониторить snapshot progress:
# Проверить connector status
curl http://localhost:8083/connectors/mysql-connector/status | jq
# Смотреть logs
docker compose logs -f connect | grep -i snapshot
Вы увидите сообщения:
INFO Snapshot mode is 'when_needed', determining if snapshot is required
INFO Snapshot is needed due to unavailable binlog position
INFO Starting snapshot for 3 table(s) in database inventory
INFO Step 1: acquiring global read lock to prevent writes to database
INFO Step 2: reading schema for captured tables
INFO Step 3: snapshotting data for table inventory.customers
...
INFO Snapshot completed successfully
INFO Transitioning to streaming mode from binlog position mysql-bin.000257:1024
Recovery time: Пропорционален размеру данных (1TB таблица = ~несколько часов).
Option B: Manual Reset с snapshot.mode=initial
Когда использовать: Если нужен fresh start или если when_needed не работает (edge cases).
Шаг 1: Остановить и удалить connector
curl -X DELETE http://localhost:8083/connectors/mysql-connector
Шаг 2: Очистить offset topic (опционально)
Если connector пересоздается с тем же именем, Kafka Connect попытается использовать старый offset. Чтобы force fresh start, нужно удалить offset.
# Найти offset key для удаления
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic connect-offsets \
--from-beginning \
--property print.key=true \
| grep mysql-connector
# Удалить offset через tombstone record (advanced)
# Или пересоздать connector с новым именем
Проще: создать connector с новым именем
# Новое имя: mysql-connector-v2
Шаг 3: Создать connector с snapshot.mode=initial
{
"name": "mysql-connector-v2",
"config": {
"snapshot.mode": "initial",
...
}
}
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @mysql-connector-v2-config.json
Шаг 4: Мониторить snapshot
См. Option A для monitoring.
Recovery time: То же, что Option A (snapshot proportional to data size).
Prevention
Layer 1: Adequate Binlog Retention (см. Lesson 03)
Формула:
binlog_expire_logs_seconds = Max(
max_expected_downtime,
snapshot_duration
) * safety_margin
Для 500GB таблицы с 6-hour snapshot:
Minimum retention = 6 hours * 1.5 (safety) = 9 hours
Recommended = 14+ days (336+ hours)
Layer 2: Heartbeat Events (см. Lesson 03)
Ensure offset постоянно обновляется даже на idle таблицах:
{
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO inventory.debezium_heartbeat (id, ts) VALUES (1, NOW()) ON DUPLICATE KEY UPDATE ts = NOW()"
}
Layer 3: Monitoring Lag (см. Lesson 10)
Alert на:
- alert: DebeziumBinlogLagHigh
expr: debezium_metrics_MilliSecondsBehindSource > 432000000 # 5 days
for: 1h
Layer 4: High Availability Kafka Connect
Kafka Connect в distributed mode с multiple workers:
- Automatic connector failover при worker crash
- Reduces downtime window
Scenario 2: Schema History Topic Corruption
Симптомы
Connector fails при старте с ошибкой:
ERROR: The db history topic 'schema-history.mysql-connector' is fully or partially missing
Unable to restore history from Kafka
Или во время streaming:
ERROR: Exception while processing event
org.apache.kafka.common.errors.SerializationException: Error deserializing schema history event
Или schema mismatch errors:
ERROR: Table 'customers' not found in schema history
ERROR: Column 'phone' doesn't exist (but it exists in DB)
Причины
- Kafka retention purged topic — default 7-day retention удалил старые DDL records (см. Lesson 06, “7-дневная бомба”)
- Accidental topic deletion —
kafka-topics --deleteвыполнен по ошибке - Broker failure с data loss — Kafka broker crashed до replication completed
- Partial write during DDL — Connector crashed во время записи DDL event, topic contains incomplete JSON
Diagnosis Steps
Шаг 1: Проверить существование topic
kafka-topics --bootstrap-server kafka:9092 \
--list | grep schema-history
Если topic отсутствует:
(no output) → Topic deleted
Шаг 2: Проверить retention configuration
kafka-configs --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name schema-history.mysql-connector \
--describe
Ожидаемый вывод:
retention.ms=-1
retention.bytes=-1
Если вы видите:
retention.ms=604800000 # 7 days
Проблема найдена: Retention purged topic.
Шаг 3: Подсчитать messages в topic
kafka-run-class kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector
Вывод:
schema-history.mysql-connector:0:15
15 messages в topic.
Сравните с ожидаемым количеством DDL операций:
- Если вы выполнили 50 DDL, но в topic только 15 → partial loss
- Если topic пуст (0 messages) → complete loss
Шаг 4: Прочитать topic content
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector \
--from-beginning \
--timeout-ms 5000
Проверьте:
- Неполные JSON messages (corruption)
- Missing DDL events для известных таблиц
- Duplicate events (edge case)
Recovery Procedure
CRITICAL: Verify No DDL Before Using recovery Mode
snapshot.mode=recovery читает текущую схему DB и предполагает, что она совпадает со схемой на момент last offset.
Если между last offset и сейчас произошли DDL изменения → connector будет применять неправильную схему к событиям → silent data corruption в downstream consumers.
Пример катастрофы:
- Last offset: mysql-bin.000100, pos 5000
- Между 5000 и 10000:
ALTER TABLE customers DROP COLUMN email - Recovery mode читает current schema (без email)
- Connector обрабатывает события 5000-10000 с current schema
- События с email данными обрабатываются как если бы email не было
- Data loss в consumers
Всегда проверяйте DDL history перед recovery mode!
Проверка знанийПочему snapshot.mode=recovery опасен, если между last offset и текущим моментом произошли DDL изменения?
Prerequisite: DDL Verification
Перед использованием snapshot.mode=recovery обязательно проверьте, были ли DDL изменения с last offset.
Шаг 1: Найти last offset position
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic connect-offsets \
--from-beginning \
--property print.key=true \
| grep mysql-connector
Результат:
{
"file": "mysql-bin.000255",
"pos": 4567890
}
Шаг 2: Проверить binlog на DDL events с этой позиции
Подключитесь к MySQL:
docker compose exec mysql mysql -u root -p
Ищите DDL события:
SHOW BINLOG EVENTS IN 'mysql-bin.000255' FROM 4567890 LIMIT 1000;
Или через несколько файлов:
SHOW BINLOG EVENTS IN 'mysql-bin.000255';
SHOW BINLOG EVENTS IN 'mysql-bin.000256';
SHOW BINLOG EVENTS IN 'mysql-bin.000257';
Ищите Event_type:
Queryс DDL statements:CREATE,ALTER,DROP,RENAME,TRUNCATE
Пример DDL event:
+------------------+------+------------+-----------+-------------+---------------------------------------+
| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
+------------------+------+------------+-----------+-------------+---------------------------------------+
| mysql-bin.000255 | 5000 | Query | 1 | 5100 | ALTER TABLE customers ADD COLUMN phone|
+------------------+------+------------+-----------+-------------+---------------------------------------+
Если нашли DDL: НЕ используйте recovery mode → переходите к Option C (Fresh Connector).
Если DDL НЕТ: Безопасно использовать recovery mode → переходите к Option A.
Option A: snapshot.mode=recovery (If No DDL)
Когда использовать: Schema history topic corrupted/lost, но DDL изменений не было с last offset.
Как работает: Connector читает current database schema и rebuilds schema history topic.
Конфигурация:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"table.include.list": "inventory.*",
"snapshot.mode": "recovery",
"schema.history.internal.kafka.topic": "schema-history.mysql-connector",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
}
}
Применить конфигурацию:
curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
-H "Content-Type: application/json" \
-d @mysql-connector-recovery-config.json
Restart connector:
curl -X POST http://localhost:8083/connectors/mysql-connector/restart
Мониторить recovery:
docker compose logs -f connect | grep -i recovery
Вы увидите:
INFO Snapshot mode is 'recovery', recovering database schema history
INFO Reading current schema for table inventory.customers
INFO Reading current schema for table inventory.orders
INFO Schema history topic rebuilt with 3 tables
INFO Recovery completed, transitioning to streaming mode
КРИТИЧНО: После recovery завершён, измените snapshot.mode обратно:
{
"snapshot.mode": "when_needed"
}
curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
-H "Content-Type: application/json" \
-d '{"snapshot.mode": "when_needed"}'
Почему: snapshot.mode=recovery предназначен только для one-time recovery, не для normal operation. Оставив его, вы рискуете silent failures при следующем restart.
Recovery time: Секунды-минуты (читает только metadata, не data).
Option B: Restore from Backup (Fastest)
Когда использовать: У вас есть recent backup schema history topic.
Как работает: Restore backup в новый topic, connector resume normально.
Шаг 1: Создать новый schema history topic (если удалён)
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-history.mysql-connector \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=-1 \
--config retention.bytes=-1
Шаг 2: Restore из backup
# Unzip backup если gzipped
gunzip schema-history-backup-2026-01-31.json.gz
# Restore сообщения
# ВАЖНО: Удалите timestamps/keys из backup, оставьте только JSON payloads
cat schema-history-backup-2026-01-31.json | \
awk -F'\t' '{print $3}' | \
kafka-console-producer \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector
Шаг 3: Проверить restore
kafka-run-class kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector
Должны увидеть messages count соответствующий backup.
Шаг 4: Restart connector
curl -X POST http://localhost:8083/connectors/mysql-connector/restart
Recovery time: 1-5 минут (vs часы для resnapshot).
Option C: Fresh Start with New Connector (If DDL Happened)
Когда использовать: DDL изменения были с last offset, recovery mode небезопасен.
Шаг 1: Удалить old connector
curl -X DELETE http://localhost:8083/connectors/mysql-connector
Шаг 2: Удалить corrupted schema history topic
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--topic schema-history.mysql-connector
Шаг 3: Создать new schema history topic
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-history.mysql-connector-v2 \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=-1 \
--config retention.bytes=-1
Шаг 4: Создать connector с новым именем и snapshot.mode=initial
{
"name": "mysql-connector-v2",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184055",
"database.server.name": "mysql-server",
"table.include.list": "inventory.*",
"snapshot.mode": "initial",
"schema.history.internal.kafka.topic": "schema-history.mysql-connector-v2",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
}
}
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @mysql-connector-v2-config.json
Последствия:
- Full resnapshot required
- Downstream consumers получат duplicate events (snapshot + streaming)
- Требуется idempotent consumers или manual deduplication
Recovery time: Пропорционален data size (часы для больших таблиц).
Prevention: Defense in Depth
Connector: FAILED
CRITICAL
Check logs, binlog, schema history
Based on diagnosis
Test before resume
Monitor 1+ hour
- Data duplication (partial snapshot + streaming)
- Offset corruption (race condition)
- Schema history conflict (concurrent writes)
| Recovery Method | Time Estimate | When to Use |
|---|---|---|
| Restore from backup | 1-5 minutes | Backup exists, schema history corruption |
| snapshot.mode=recovery | Seconds-minutes | No DDL since last offset |
| snapshot.mode=when_needed | Hours (data-dependent) | Binlog position loss |
| Fresh connector + initial | Hours (data-dependent) | DDL happened, no other option |
Layer 1: Infinite Retention на Schema History Topic
# После connector создал topic, проверьте retention
kafka-configs --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name schema-history.mysql-connector \
--describe
# Если не установлен, добавьте
kafka-configs --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name schema-history.mysql-connector \
--alter \
--add-config retention.ms=-1,retention.bytes=-1
Почему оба параметра:
retention.ms=-1: Time-based retention disabledretention.bytes=-1: Size-based retention disabled
Если установить только retention.ms, broker-level log.retention.bytes может purge topic.
Layer 2: Adequate Binlog Retention
Формула из Lesson 03:
binlog_expire_logs_seconds = Max(
snapshot_duration,
max_expected_downtime
) * 1.5 (safety margin)
Для 500GB table:
- Snapshot duration: ~6 hours
- Safety margin: 1.5x
- Minimum: 9 hours (32400 seconds)
- Recommended: 14+ days (1209600+ seconds)
SET GLOBAL binlog_expire_logs_seconds = 1209600;
Make persistent в my.cnf:
[mysqld]
binlog_expire_logs_seconds = 1209600
Layer 3: Regular Backups
См. секцию “Schema History Backup and Restore” ниже.
Layer 4: Monitoring
Prometheus alerting rules:
groups:
- name: debezium_schema_history
rules:
- alert: SchemaHistoryRetentionIncorrect
expr: kafka_topic_config{topic=~"schema-history.*",key="retention.ms"} != -1
for: 5m
annotations:
summary: "Schema history topic has finite retention (CRITICAL)"
- alert: SchemaHistoryMessageCountDrop
expr: delta(kafka_log_size{topic=~"schema-history.*"}[1h]) < -1
for: 5m
annotations:
summary: "Schema history topic size decreased (possible purge)"
- alert: DebeziumOffsetAgeTooOld
expr: (time() - debezium_metrics_LastEventTimestamp) > 432000 # 5 days
for: 1h
annotations:
summary: "Debezium offset age approaching binlog retention"
Schema History Backup and Restore
Backup перед connector upgrades
Всегда создавайте backup schema history topic перед:
- Debezium connector version upgrade
- Major DDL migrations
- Kafka Connect upgrade
Recovery из backup = минуты. Resnapshot после corruption = часы.
Backup Procedure
Manual backup (on-demand):
# Export schema history topic в файл
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector \
--from-beginning \
--property print.key=true \
--property print.timestamp=true \
--timeout-ms 30000 \
> schema-history-backup-$(date +%Y%m%d-%H%M%S).json
# Compress для экономии места
gzip schema-history-backup-*.json
Automated backup script:
#!/bin/bash
# /usr/local/bin/backup-debezium-schema-history.sh
KAFKA_BOOTSTRAP="kafka:9092"
BACKUP_DIR="/backups/debezium-schema-history"
DATE=$(date +%Y%m%d-%H%M%S)
# Список всех schema history topics
TOPICS=$(kafka-topics --bootstrap-server "$KAFKA_BOOTSTRAP" --list | grep "^schema-history\.")
mkdir -p "$BACKUP_DIR"
for TOPIC in $TOPICS; do
echo "Backing up $TOPIC..."
kafka-console-consumer \
--bootstrap-server "$KAFKA_BOOTSTRAP" \
--topic "$TOPIC" \
--from-beginning \
--property print.key=true \
--property print.timestamp=true \
--timeout-ms 30000 \
> "$BACKUP_DIR/${TOPIC}-${DATE}.json"
gzip "$BACKUP_DIR/${TOPIC}-${DATE}.json"
echo "Backup saved: $BACKUP_DIR/${TOPIC}-${DATE}.json.gz"
done
# Удалить backups старше 30 дней
find "$BACKUP_DIR" -name "schema-history-*.json.gz" -mtime +30 -delete
echo "Backup completed: $(date)"
Cron schedule (daily at 2 AM):
0 2 * * * /usr/local/bin/backup-debezium-schema-history.sh >> /var/log/debezium-backup.log 2>&1
Backup frequency recommendations:
| Environment | Frequency | Retention |
|---|---|---|
| Production (active DDL) | Daily + before schema changes | 30+ days |
| Production (stable) | Weekly | 60+ days |
| Staging | Before major changes | 14 days |
| Dev | Before connector upgrades | 7 days |
Restore Procedure
Шаг 1: Подготовить backup file
# Unzip backup
gunzip schema-history-backup-20260201-140000.json.gz
# Inspect format
head -3 schema-history-backup-20260201-140000.json
Формат backup:
CreateTime:1706745600000 null {"ddl":"CREATE DATABASE `inventory`",...}
CreateTime:1706745601000 null {"ddl":"CREATE TABLE `customers`",...}
CreateTime:1706745602000 null {"ddl":"ALTER TABLE `customers`",...}
Шаг 2: Extract JSON payloads (удалить timestamps и keys)
cat schema-history-backup-20260201-140000.json | \
awk -F'\t' '{print $3}' \
> schema-history-payloads.json
Шаг 3: Создать schema history topic (если удалён)
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-history.mysql-connector \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=-1 \
--config retention.bytes=-1
Шаг 4: Restore payloads в topic
kafka-console-producer \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector \
< schema-history-payloads.json
Шаг 5: Verify restore
# Подсчитать messages
kafka-run-class kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector
# Прочитать первое и последнее сообщение
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector \
--from-beginning \
--max-messages 1
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector \
--partition 0 \
--offset latest \
--max-messages 1
Шаг 6: Restart connector
curl -X POST http://localhost:8083/connectors/mysql-connector/restart
Мониторить startup:
docker compose logs -f connect | grep mysql-connector
Ожидаемые сообщения:
INFO Reading schema history from Kafka topic schema-history.mysql-connector
INFO Schema history recovered with 15 DDL events
INFO Starting streaming from binlog position mysql-bin.000255:4567890
Recovery Time Estimates
Помогает планировать downtime для различных recovery сценариев.
| Recovery Method | Time Estimate | Data Dependency | Downside |
|---|---|---|---|
| snapshot.mode=when_needed | Proportional to data size 1TB = ~4-8 hours | Yes | May produce duplicate events during snapshot |
| snapshot.mode=recovery | Seconds to minutes (reads schema only, not data) | No | DANGEROUS if DDL happened since last offset |
| Restore from backup | Seconds to minutes (Kafka topic restore) | No | Requires recent backup exists |
| Fresh connector (snapshot.mode=initial) | Snapshot time + downstream reprocessing 1TB = ~4-8 hours + consumer lag | Yes | Duplicate events in consumers, requires idempotency |
| Manual offset reset | Minutes (if offset position known) | No | Risk of duplicate/missing events if wrong |
Пример планирования для 500GB database:
-
Best case (backup exists):
- Restore schema history: 2 minutes
- Connector restart: 30 seconds
- Total downtime: ~3 minutes
-
Moderate case (recovery mode safe):
- DDL verification: 5 minutes
- Recovery mode execution: 1 minute
- Connector resume: 30 seconds
- Total downtime: ~7 minutes
-
Worst case (resnapshot required):
- Snapshot 500GB: ~3-6 hours
- Downstream consumer catch-up: ~1 hour
- Total downtime: ~4-7 hours
Planning recommendation: Aim for best case через regular backups.
Common Mistakes During Recovery
Top 5 Recovery Mistakes
- Using recovery mode when DDL happened → silent data corruption
- Forgetting to change snapshot.mode back after recovery → connector stays in recovery mode
- Not verifying recovery before resuming downstream → propagating corruption
- Skipping backup verification → discovering backup is corrupt during emergency
- Deleting old connector before verifying new one works → losing offset/schema history
Mistake 1: Recovery Mode with DDL Changes
Что происходит:
-- Last offset: mysql-bin.000100, pos 5000
-- Developer выполнил DDL:
ALTER TABLE customers ADD COLUMN loyalty_points INT DEFAULT 0;
-- DBA видит schema history topic corrupted
-- Использует snapshot.mode=recovery БЕЗ проверки DDL
Последствия:
- Connector читает current schema (с loyalty_points)
- Применяет эту схему к событиям ДО DDL
- События без loyalty_points интерпретируются с loyalty_points
- Consumers получают corrupt data
Правильный подход:
Всегда проверяйте DDL history через SHOW BINLOG EVENTS (см. DDL Verification выше).
Mistake 2: Leaving Recovery Mode Active
Что происходит:
# Recovery выполнен
curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
-d '{"snapshot.mode": "recovery"}'
# Connector restart, recovery успешна
# Администратор забывает изменить snapshot.mode обратно
# Через неделю: connector restart по другой причине
# Recovery mode triggers снова → unnecessary schema rebuild
Правильный подход:
После recovery завершён:
curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
-d '{"snapshot.mode": "when_needed"}'
Mistake 3: Not Verifying Recovery
Что происходит:
- Recovery completed
- Connector status: RUNNING
- Администратор assumes всё работает
- Downstream consumers начинают обрабатывать events
- Hours later: обнаруживаются schema mismatch errors в consumer logs
Правильный подход:
После recovery:
-
Verify connector streaming:
curl http://localhost:8083/connectors/mysql-connector/status | jq .connector.state # Должно быть: "RUNNING" -
Verify events flowing:
kafka-console-consumer \ --bootstrap-server kafka:9092 \ --topic mysql-server.inventory.customers \ --max-messages 5 -
Test schema в consumers:
# Trigger test INSERT docker compose exec mysql mysql -u root -p -e "INSERT INTO inventory.customers VALUES (9999, 'Test Recovery', '[email protected]')" # Verify consumer processes it без errors docker compose logs -f consumer-app | grep "Test Recovery" -
Monitor for 1 hour перед объявлением recovery successful.
Mistake 4: Untested Backups
Что происходит:
- Backup script runs daily
- Backups accumulate в storage
- Emergency happens
- Администратор tries restore
- Backup file corrupted / format changed / incomplete
Правильный подход:
Monthly backup restore drill:
# 1. Create test schema history topic
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-history.test-restore \
--partitions 1 \
--replication-factor 1 \
--config retention.ms=-1
# 2. Restore latest backup в test topic
cat schema-history-backup-latest.json | \
awk -F'\t' '{print $3}' | \
kafka-console-producer \
--bootstrap-server kafka:9092 \
--topic schema-history.test-restore
# 3. Verify message count
kafka-run-class kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 \
--topic schema-history.test-restore
# 4. Spot-check content
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic schema-history.test-restore \
--from-beginning \
--max-messages 10
# 5. Clean up test topic
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--topic schema-history.test-restore
Document drill results в runbook.
Mistake 5: Deleting Before Verifying
Что происходит:
# Recovery scenario: schema history corrupted
# Администратор creates new connector mysql-connector-v2
curl -X POST http://localhost:8083/connectors -d @mysql-connector-v2.json
# Немедленно удаляет old connector
curl -X DELETE http://localhost:8083/connectors/mysql-connector
# New connector fails to start (config error)
# Old connector deleted → offset topic entry gone
# **Impossible to rollback**
Правильный подход:
- Create new connector (mysql-connector-v2)
- Verify new connector RUNNING и streaming events
- Monitor for 1+ hours
- Pause old connector (not delete)
- After 24 hours success: delete old connector
Это дает safety net для rollback.
Hands-on Exercise: Simulated Recovery
Практические упражнения для отработки recovery procedures в безопасной среде.
Exercise 1: Binlog Purge Recovery
Цель: Simulate binlog purge и recover с snapshot.mode=when_needed.
Шаг 1: Запустить connector в нормальном режиме
# Если connector ещё не создан
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"table.include.list": "inventory.*",
"snapshot.mode": "when_needed",
"schema.history.internal.kafka.topic": "schema-history.mysql-connector",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
}
}'
Шаг 2: Создать test data
docker compose exec mysql mysql -u root -pmysql -e "
USE inventory;
INSERT INTO customers VALUES (100, 'Alice', '[email protected]');
INSERT INTO customers VALUES (101, 'Bob', '[email protected]');
"
Шаг 3: Проверить current binlog position
docker compose exec mysql mysql -u root -pmysql -e "SHOW MASTER STATUS\G"
Запомните File и Position.
Шаг 4: Force binlog rotation
docker compose exec mysql mysql -u root -pmysql -e "FLUSH BINARY LOGS"
Шаг 5: Остановить connector
curl -X PUT http://localhost:8083/connectors/mysql-connector/pause
Шаг 6: Simulate binlog purge
# Проверить доступные binlog files
docker compose exec mysql mysql -u root -pmysql -e "SHOW BINARY LOGS"
# Manual purge старых файлов (оставьте только последний)
docker compose exec mysql mysql -u root -pmysql -e "PURGE BINARY LOGS TO 'mysql-bin.000005'"
Шаг 7: Resume connector (триггерит position loss)
curl -X PUT http://localhost:8083/connectors/mysql-connector/resume
Шаг 8: Observe error в logs
docker compose logs -f connect | grep -i "binlog"
Вы должны увидеть:
ERROR: Connector requires binlog file 'mysql-bin.000003', but MySQL only has mysql-bin.000005
Шаг 9: Verify snapshot.mode=when_needed triggers recovery
docker compose logs -f connect | grep -i snapshot
Вы должны увидеть:
INFO: Snapshot is needed due to unavailable binlog position
INFO: Starting snapshot for tables in database inventory
Шаг 10: Wait for snapshot completion
curl http://localhost:8083/connectors/mysql-connector/status | jq .connector.state
Когда state = “RUNNING”, snapshot завершён.
Шаг 11: Verify streaming resumed
# Insert new data
docker compose exec mysql mysql -u root -pmysql -e "
INSERT INTO inventory.customers VALUES (102, 'Charlie', '[email protected]');
"
# Check Kafka topic
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic mysql-server.inventory.customers \
--from-beginning \
--max-messages 10
Expected result: Connector recovered automatically, streaming resumed.
Exercise 2: Backup and Restore Schema History
Цель: Practice backup and restore workflow.
Шаг 1: Create backup
kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector \
--from-beginning \
--property print.key=true \
--property print.timestamp=true \
--timeout-ms 10000 \
> /tmp/schema-history-backup-test.json
Шаг 2: Verify backup content
cat /tmp/schema-history-backup-test.json | wc -l
# Should show number of DDL events
head -3 /tmp/schema-history-backup-test.json
# Should show DDL events
Шаг 3: Pause connector
curl -X PUT http://localhost:8083/connectors/mysql-connector/pause
Шаг 4: Delete schema history topic
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--topic schema-history.mysql-connector
Шаг 5: Recreate topic
kafka-topics --bootstrap-server kafka:9092 \
--create \
--topic schema-history.mysql-connector \
--partitions 1 \
--replication-factor 1 \
--config retention.ms=-1 \
--config retention.bytes=-1
Шаг 6: Restore from backup
cat /tmp/schema-history-backup-test.json | \
awk -F'\t' '{print $3}' | \
kafka-console-producer \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector
Шаг 7: Verify restore
kafka-run-class kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 \
--topic schema-history.mysql-connector
Шаг 8: Resume connector
curl -X PUT http://localhost:8083/connectors/mysql-connector/resume
Шаг 9: Verify connector resumed
curl http://localhost:8083/connectors/mysql-connector/status | jq
# Should show state: "RUNNING"
Expected result: Connector resumed без errors, schema history restored.
Summary Checklist
Pre-Production Setup
- Configure
snapshot.mode=when_neededfor automatic binlog position recovery - Set schema history topic retention:
retention.ms=-1ANDretention.bytes=-1 - Configure adequate binlog retention:
binlog_expire_logs_seconds= max_downtime * safety_margin - Setup heartbeat events:
heartbeat.interval.ms=10000 - Configure monitoring alerts: lag, retention config, topic size
- Setup daily backup script for schema history topic
- Test backup restore procedure (monthly drill)
- Document recovery procedures в runbook
During Incident
- Identify error type from connector logs (binlog vs schema history)
- Check available binlog files vs required position
- Check schema history topic exists and has correct retention
- If schema history issue: Verify no DDL since last offset
- Choose recovery path based on diagnosis
- Document incident timeline для post-mortem
Post-Recovery
- Verify connector state: RUNNING
- Test event flow: insert test data, check Kafka topics
- Verify downstream consumers processing без errors
- Reset
snapshot.modeback towhen_needed(if used recovery mode) - Update monitoring dashboards
- Update alerts based on lessons learned
- Schedule post-mortem meeting
- Update runbook с actual recovery time
Key Takeaways
- Binlog position loss happens when connector offline дольше
binlog_expire_logs_seconds - snapshot.mode=when_needed provides automatic recovery от binlog purge (triggers snapshot)
- Schema history corruption happens when Kafka retention purges schema history topic
- snapshot.mode=recovery rebuilds schema history from current DB schema (seconds recovery)
- CRITICAL: Verify no DDL changes before using recovery mode → silent corruption if DDL happened
- Prevention cheaper than recovery: infinite retention costs MB, recovery costs hours
- Defense in depth: retention + backups + monitoring = 99.9% uptime
- Regular backups enable minutes recovery vs hours resnapshot
- Always verify recovery перед resuming downstream consumers
- Test recovery procedures monthly в staging environment
- Document runbooks с step-by-step procedures для on-call engineers
- when_needed may produce duplicates during snapshot → consumers must be idempotent
Что дальше?
Мы завершили изучение recovery procedures для MySQL CDC. Теперь вы знаете:
- Как диагностировать binlog position loss vs schema history corruption
- Step-by-step recovery workflows для обоих сценариев
- Prevention strategies в формате defense-in-depth
- Backup and restore procedures для fast recovery
Module 8 Complete! Вы теперь понимаете:
- MySQL binlog architecture и форматы (Lesson 01)
- GTID mode для failover resilience (Lesson 02)
- Binlog retention и heartbeat (Lesson 03)
- Connector configuration (Lesson 04)
- PostgreSQL vs MySQL comparison (Lesson 05)
- Schema history topic internals (Lesson 06)
- Aurora-specific configuration (Lessons 07-09)
- Monitoring и lag metrics (Lesson 10)
- GTID failover procedures (Lesson 11)
- Incremental snapshots (Lesson 12)
- Recovery procedures (Lesson 13) ← We are here
Next Module: Advanced Topics — Multi-connector deployments, online DDL integration (gh-ost/pt-osc), и GTID migration strategies.
Вы готовы к production MySQL CDC deployments с confidence в recovery capabilities.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс