Skip to content
Learning Platform
Advanced
35 minutes
mysql debezium recovery snapshot-modes backup

Prerequisites:

  • module-8/06-schema-history-recovery
  • module-8/03-binlog-retention-heartbeat

Recovery Procedures: Binlog Loss и Schema History Corruption

Production Reality: Сбои неизбежны

Вы настроили правильный retention. Вы настроили heartbeat. Вы мониторите lag. И всё равно — сбои происходят:

  • Binlog purge из-за неожиданного downtime (Kafka cluster сбой, сетевой split, длительный deploy)
  • Schema history topic corruption из-за Kafka retention policy, accidental deletion, broker failure
  • Connector crash во время DDL операции с частичной записью в schema history
  • Manual mistakes — случайный PURGE BINARY LOGS, изменение retention без анализа последствий

В этом уроке мы изучим как диагностировать и восстановиться от двух самых критичных сбоев MySQL CDC в production:

  1. Binlog Position Loss — connector требует binlog файл, который уже purged
  2. Schema History Topic Corruption — schema history topic полностью или частично отсутствует

Мы рассмотрим decision tree для диагностики, step-by-step recovery workflows, prevention strategies в формате defense-in-depth, и backup procedures для быстрого восстановления.

Note

Prevention vs Recovery Trade-off

Prevention всегда дешевле recovery:

  • Infinite retention на schema history: бесплатно (несколько MB disk space)
  • Recovery от corrupted schema history: часы downtime + resnapshot
  • Adequate binlog retention: копейки disk space
  • Recovery от purged binlog: resnapshot 500GB таблицы = 6+ часов

Но даже с perfect prevention нужно знать recovery procedures для edge cases.

Recovery Decision Tree

Когда connector падает, первый шаг — диагностика типа сбоя по error message.

Recovery Decision Tree
Connector Failed
Status: FAILED
read error
Analyze Error Message
"binlog file", "purged",
"file not available"
Binlog Position Loss
1. SHOW BINARY LOGS
2. Check offset topic
3. Compare positions
snapshot.mode=
when_needed
"db history topic",
"partially missing"
Schema History
Corruption
DDL changes
since last offset?
Нет DDL
recovery mode
Были DDL
fresh connector
Есть backup
restore backup
"not found in schema",
"column mismatch"
Schema Mismatch
Обычно следствие
Schema History issue
See Scenario 2
Error Message Patterns
Error PatternScenarioRecovery Path
binlog file not availableBinlog Position Losswhen_needed / initial
history topic missingSchema History Corruptionrecovery / backup / fresh
table not found in schemaSchema MismatchSee Schema History

Error Message Patterns для быстрой диагностики:

Error PatternScenarioLesson Reference
Connector requires binlog file 'mysql-bin.001234', but MySQL only has mysql-bin.001456Binlog Position LossСм. Lesson 03 (Binlog Retention)
The db history topic or its content is fully or partially missingSchema History CorruptionСм. Lesson 06 (Schema History)
Table 'customers' not found in schema historySchema MismatchОбычно следствие Scenario 2
Column count doesn't match value count at row 1Schema MismatchОбычно следствие Scenario 2

Scenario 1: Binlog Position Loss

Симптомы

Connector fails при старте или во время streaming с ошибкой:

ERROR: Error during binlog reading
io.debezium.DebeziumException: The connector is trying to read binlog starting at
binlog file 'mysql-bin.000134', pos=4567890, but this file no longer exists on the server.

Или через MySQL replication protocol:

ERROR 1236 (HY000): Could not find first log file name in binary log index file

Причины

  1. Binlog retention expired — connector был offline дольше binlog_expire_logs_seconds
  2. Manual PURGE BINARY LOGS — DBA вручную удалил старые binlog файлы для освобождения disk space
  3. MySQL restart с агрессивным retention — binlog_expire_logs_seconds изменён на меньшее значение
  4. Long-running snapshot — snapshot длится дольше binlog retention, position purged до завершения snapshot (см. Lesson 03, Pitfall 2)

Diagnosis Steps

Шаг 1: Проверить доступные binlog файлы

Подключитесь к MySQL:

docker compose exec mysql mysql -u root -p

Проверьте список binlog файлов:

SHOW BINARY LOGS;

Вывод покажет самый старый доступный файл:

+------------------+-----------+-----------+
| Log_name         | File_size | Encrypted |
+------------------+-----------+-----------+
| mysql-bin.000255 | 1024      | No        |
| mysql-bin.000256 | 2048      | No        |
| mysql-bin.000257 | 4096      | No        |
+------------------+-----------+-----------+

Самый старый файл: mysql-bin.000255

Шаг 2: Проверить required position в offset topic

kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic connect-offsets \
  --from-beginning \
  --property print.key=true \
  | grep mysql-connector

Найдите последний offset для вашего connector:

["mysql-connector",{"server":"mysql-server"}]
{
  "file": "mysql-bin.000134",
  "pos": 4567890,
  "gtids": "3e11fa47-71ca-11e1-9e33-c80aa9429562:1-150"
}

Required file: mysql-bin.000134

Шаг 3: Сравнить positions

Required:  mysql-bin.000134 (from offset)
Available: mysql-bin.000255 (oldest on MySQL)

000134 < 000255 → Position LOST

Connector не может продолжить — binlog файлы 000134-000254 уже purged.

Шаг 4: Оценить data gap

Если у вас GTID mode enabled, проверьте purged GTID set:

SHOW GLOBAL VARIABLES LIKE 'gtid_purged';

Вывод:

+---------------+------------------------------------------+
| Variable_name | Value                                    |
+---------------+------------------------------------------+
| gtid_purged   | 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-200 |
+---------------+------------------------------------------+

Сравните с offset GTID (1-150):

  • Offset: до 150
  • Purged: до 200
  • Gap: GTID 151-200 потеряны
Проверка знаний
Как диагностировать binlog position loss и определить размер потерянных данных?
Ответ
Нужно сравнить required binlog file из connector offset topic (connect-offsets) с oldest available file на MySQL (SHOW BINARY LOGS). Если required file старше oldest available — position lost. Для оценки data gap при включённом GTID сравните gtid_purged на MySQL с GTID в offset: разница между ними показывает количество потерянных транзакций.

Recovery Procedure

Warning

when_needed может создать дублирующие события

Во время automatic snapshot connector будет читать текущее состояние таблиц. Если между last offset и сейчас были INSERT операции, вы получите duplicate events для этих записей:

  1. Событие от streaming до position loss
  2. То же событие от snapshot после recovery

Downstream consumers должны быть idempotent или иметь deduplication logic.

Option A: Automatic Recovery с snapshot.mode=when_needed

Когда использовать: Production deployments, где нужен minimal manual intervention.

Как работает: Connector автоматически детектирует, что binlog position unavailable, и триггерит snapshot (см. Lesson 09 для snapshot mechanics).

Конфигурация:

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "table.include.list": "inventory.*",

    "snapshot.mode": "when_needed",

    "schema.history.internal.kafka.topic": "schema-history.mysql-connector",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",

    "heartbeat.interval.ms": "10000",
    "heartbeat.action.query": "INSERT INTO inventory.debezium_heartbeat (id, ts) VALUES (1, NOW()) ON DUPLICATE KEY UPDATE ts = NOW()"
  }
}

Применить конфигурацию:

curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
  -H "Content-Type: application/json" \
  -d @mysql-connector-config.json

Restart connector:

curl -X POST http://localhost:8083/connectors/mysql-connector/restart

Мониторить snapshot progress:

# Проверить connector status
curl http://localhost:8083/connectors/mysql-connector/status | jq

# Смотреть logs
docker compose logs -f connect | grep -i snapshot

Вы увидите сообщения:

INFO Snapshot mode is 'when_needed', determining if snapshot is required
INFO Snapshot is needed due to unavailable binlog position
INFO Starting snapshot for 3 table(s) in database inventory
INFO Step 1: acquiring global read lock to prevent writes to database
INFO Step 2: reading schema for captured tables
INFO Step 3: snapshotting data for table inventory.customers
...
INFO Snapshot completed successfully
INFO Transitioning to streaming mode from binlog position mysql-bin.000257:1024

Recovery time: Пропорционален размеру данных (1TB таблица = ~несколько часов).

Option B: Manual Reset с snapshot.mode=initial

Когда использовать: Если нужен fresh start или если when_needed не работает (edge cases).

Шаг 1: Остановить и удалить connector

curl -X DELETE http://localhost:8083/connectors/mysql-connector

Шаг 2: Очистить offset topic (опционально)

Если connector пересоздается с тем же именем, Kafka Connect попытается использовать старый offset. Чтобы force fresh start, нужно удалить offset.

# Найти offset key для удаления
kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic connect-offsets \
  --from-beginning \
  --property print.key=true \
  | grep mysql-connector

# Удалить offset через tombstone record (advanced)
# Или пересоздать connector с новым именем

Проще: создать connector с новым именем

# Новое имя: mysql-connector-v2

Шаг 3: Создать connector с snapshot.mode=initial

{
  "name": "mysql-connector-v2",
  "config": {
    "snapshot.mode": "initial",
    ...
  }
}
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @mysql-connector-v2-config.json

Шаг 4: Мониторить snapshot

См. Option A для monitoring.

Recovery time: То же, что Option A (snapshot proportional to data size).

Prevention

Layer 1: Adequate Binlog Retention (см. Lesson 03)

Формула:

binlog_expire_logs_seconds = Max(
  max_expected_downtime,
  snapshot_duration
) * safety_margin

Для 500GB таблицы с 6-hour snapshot:

Minimum retention = 6 hours * 1.5 (safety) = 9 hours
Recommended = 14+ days (336+ hours)

Layer 2: Heartbeat Events (см. Lesson 03)

Ensure offset постоянно обновляется даже на idle таблицах:

{
  "heartbeat.interval.ms": "10000",
  "heartbeat.action.query": "INSERT INTO inventory.debezium_heartbeat (id, ts) VALUES (1, NOW()) ON DUPLICATE KEY UPDATE ts = NOW()"
}

Layer 3: Monitoring Lag (см. Lesson 10)

Alert на:

- alert: DebeziumBinlogLagHigh
  expr: debezium_metrics_MilliSecondsBehindSource > 432000000  # 5 days
  for: 1h

Layer 4: High Availability Kafka Connect

Kafka Connect в distributed mode с multiple workers:

  • Automatic connector failover при worker crash
  • Reduces downtime window

Scenario 2: Schema History Topic Corruption

Симптомы

Connector fails при старте с ошибкой:

ERROR: The db history topic 'schema-history.mysql-connector' is fully or partially missing
Unable to restore history from Kafka

Или во время streaming:

ERROR: Exception while processing event
org.apache.kafka.common.errors.SerializationException: Error deserializing schema history event

Или schema mismatch errors:

ERROR: Table 'customers' not found in schema history
ERROR: Column 'phone' doesn't exist (but it exists in DB)

Причины

  1. Kafka retention purged topic — default 7-day retention удалил старые DDL records (см. Lesson 06, “7-дневная бомба”)
  2. Accidental topic deletionkafka-topics --delete выполнен по ошибке
  3. Broker failure с data loss — Kafka broker crashed до replication completed
  4. Partial write during DDL — Connector crashed во время записи DDL event, topic contains incomplete JSON

Diagnosis Steps

Шаг 1: Проверить существование topic

kafka-topics --bootstrap-server kafka:9092 \
  --list | grep schema-history

Если topic отсутствует:

(no output) → Topic deleted

Шаг 2: Проверить retention configuration

kafka-configs --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name schema-history.mysql-connector \
  --describe

Ожидаемый вывод:

retention.ms=-1
retention.bytes=-1

Если вы видите:

retention.ms=604800000   # 7 days

Проблема найдена: Retention purged topic.

Шаг 3: Подсчитать messages в topic

kafka-run-class kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector

Вывод:

schema-history.mysql-connector:0:15

15 messages в topic.

Сравните с ожидаемым количеством DDL операций:

  • Если вы выполнили 50 DDL, но в topic только 15 → partial loss
  • Если topic пуст (0 messages) → complete loss

Шаг 4: Прочитать topic content

kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector \
  --from-beginning \
  --timeout-ms 5000

Проверьте:

  • Неполные JSON messages (corruption)
  • Missing DDL events для известных таблиц
  • Duplicate events (edge case)

Recovery Procedure

Danger

CRITICAL: Verify No DDL Before Using recovery Mode

snapshot.mode=recovery читает текущую схему DB и предполагает, что она совпадает со схемой на момент last offset.

Если между last offset и сейчас произошли DDL изменения → connector будет применять неправильную схему к событиям → silent data corruption в downstream consumers.

Пример катастрофы:

  1. Last offset: mysql-bin.000100, pos 5000
  2. Между 5000 и 10000: ALTER TABLE customers DROP COLUMN email
  3. Recovery mode читает current schema (без email)
  4. Connector обрабатывает события 5000-10000 с current schema
  5. События с email данными обрабатываются как если бы email не было
  6. Data loss в consumers

Всегда проверяйте DDL history перед recovery mode!

Проверка знаний
Почему snapshot.mode=recovery опасен, если между last offset и текущим моментом произошли DDL изменения?
Ответ
Recovery mode читает текущую схему базы данных и предполагает, что она совпадает со схемой на момент last offset. Если между ними был ALTER TABLE (например, DROP COLUMN email), connector будет применять текущую схему к старым событиям, которые ещё содержат удалённую колонку. Это приводит к silent data corruption — downstream consumers получают некорректные данные без каких-либо ошибок.

Prerequisite: DDL Verification

Перед использованием snapshot.mode=recovery обязательно проверьте, были ли DDL изменения с last offset.

Шаг 1: Найти last offset position

kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic connect-offsets \
  --from-beginning \
  --property print.key=true \
  | grep mysql-connector

Результат:

{
  "file": "mysql-bin.000255",
  "pos": 4567890
}

Шаг 2: Проверить binlog на DDL events с этой позиции

Подключитесь к MySQL:

docker compose exec mysql mysql -u root -p

Ищите DDL события:

SHOW BINLOG EVENTS IN 'mysql-bin.000255' FROM 4567890 LIMIT 1000;

Или через несколько файлов:

SHOW BINLOG EVENTS IN 'mysql-bin.000255';
SHOW BINLOG EVENTS IN 'mysql-bin.000256';
SHOW BINLOG EVENTS IN 'mysql-bin.000257';

Ищите Event_type:

  • Query с DDL statements: CREATE, ALTER, DROP, RENAME, TRUNCATE

Пример DDL event:

+------------------+------+------------+-----------+-------------+---------------------------------------+
| Log_name         | Pos  | Event_type | Server_id | End_log_pos | Info                                  |
+------------------+------+------------+-----------+-------------+---------------------------------------+
| mysql-bin.000255 | 5000 | Query      | 1         | 5100        | ALTER TABLE customers ADD COLUMN phone|
+------------------+------+------------+-----------+-------------+---------------------------------------+

Если нашли DDL: НЕ используйте recovery mode → переходите к Option C (Fresh Connector).

Если DDL НЕТ: Безопасно использовать recovery mode → переходите к Option A.

Option A: snapshot.mode=recovery (If No DDL)

Когда использовать: Schema history topic corrupted/lost, но DDL изменений не было с last offset.

Как работает: Connector читает current database schema и rebuilds schema history topic.

Конфигурация:

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "table.include.list": "inventory.*",

    "snapshot.mode": "recovery",

    "schema.history.internal.kafka.topic": "schema-history.mysql-connector",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
  }
}

Применить конфигурацию:

curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
  -H "Content-Type: application/json" \
  -d @mysql-connector-recovery-config.json

Restart connector:

curl -X POST http://localhost:8083/connectors/mysql-connector/restart

Мониторить recovery:

docker compose logs -f connect | grep -i recovery

Вы увидите:

INFO Snapshot mode is 'recovery', recovering database schema history
INFO Reading current schema for table inventory.customers
INFO Reading current schema for table inventory.orders
INFO Schema history topic rebuilt with 3 tables
INFO Recovery completed, transitioning to streaming mode

КРИТИЧНО: После recovery завершён, измените snapshot.mode обратно:

{
  "snapshot.mode": "when_needed"
}
curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
  -H "Content-Type: application/json" \
  -d '{"snapshot.mode": "when_needed"}'

Почему: snapshot.mode=recovery предназначен только для one-time recovery, не для normal operation. Оставив его, вы рискуете silent failures при следующем restart.

Recovery time: Секунды-минуты (читает только metadata, не data).

Option B: Restore from Backup (Fastest)

Когда использовать: У вас есть recent backup schema history topic.

Как работает: Restore backup в новый topic, connector resume normально.

Шаг 1: Создать новый schema history topic (если удалён)

kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-history.mysql-connector \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=-1 \
  --config retention.bytes=-1

Шаг 2: Restore из backup

# Unzip backup если gzipped
gunzip schema-history-backup-2026-01-31.json.gz

# Restore сообщения
# ВАЖНО: Удалите timestamps/keys из backup, оставьте только JSON payloads
cat schema-history-backup-2026-01-31.json | \
  awk -F'\t' '{print $3}' | \
  kafka-console-producer \
    --bootstrap-server kafka:9092 \
    --topic schema-history.mysql-connector

Шаг 3: Проверить restore

kafka-run-class kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector

Должны увидеть messages count соответствующий backup.

Шаг 4: Restart connector

curl -X POST http://localhost:8083/connectors/mysql-connector/restart

Recovery time: 1-5 минут (vs часы для resnapshot).

Option C: Fresh Start with New Connector (If DDL Happened)

Когда использовать: DDL изменения были с last offset, recovery mode небезопасен.

Шаг 1: Удалить old connector

curl -X DELETE http://localhost:8083/connectors/mysql-connector

Шаг 2: Удалить corrupted schema history topic

kafka-topics --bootstrap-server kafka:9092 \
  --delete \
  --topic schema-history.mysql-connector

Шаг 3: Создать new schema history topic

kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-history.mysql-connector-v2 \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=-1 \
  --config retention.bytes=-1

Шаг 4: Создать connector с новым именем и snapshot.mode=initial

{
  "name": "mysql-connector-v2",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184055",
    "database.server.name": "mysql-server",
    "table.include.list": "inventory.*",

    "snapshot.mode": "initial",

    "schema.history.internal.kafka.topic": "schema-history.mysql-connector-v2",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
  }
}
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @mysql-connector-v2-config.json

Последствия:

  • Full resnapshot required
  • Downstream consumers получат duplicate events (snapshot + streaming)
  • Требуется idempotent consumers или manual deduplication

Recovery time: Пропорционален data size (часы для больших таблиц).

Prevention: Defense in Depth

Recovery Procedure Flow
1. Detect Issue
Connector: FAILED
2. Stop Connector
CRITICAL
3. Diagnose Cause
Check logs, binlog, schema history
4. Apply Fix
Based on diagnosis
5. Verify Fix
Test before resume
6. Resume CDC
Monitor 1+ hour
Critical: Stop Before Fix
ПРАВИЛО:Всегда останавливайте connector перед исправлением.
Попытка исправить running connector может привести к:
  • Data duplication (partial snapshot + streaming)
  • Offset corruption (race condition)
  • Schema history conflict (concurrent writes)
Recovery Time Estimates
Recovery MethodTime EstimateWhen to Use
Restore from backup1-5 minutesBackup exists, schema history corruption
snapshot.mode=recoverySeconds-minutesNo DDL since last offset
snapshot.mode=when_neededHours (data-dependent)Binlog position loss
Fresh connector + initialHours (data-dependent)DDL happened, no other option

Layer 1: Infinite Retention на Schema History Topic

# После connector создал topic, проверьте retention
kafka-configs --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name schema-history.mysql-connector \
  --describe

# Если не установлен, добавьте
kafka-configs --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name schema-history.mysql-connector \
  --alter \
  --add-config retention.ms=-1,retention.bytes=-1

Почему оба параметра:

  • retention.ms=-1: Time-based retention disabled
  • retention.bytes=-1: Size-based retention disabled

Если установить только retention.ms, broker-level log.retention.bytes может purge topic.

Layer 2: Adequate Binlog Retention

Формула из Lesson 03:

binlog_expire_logs_seconds = Max(
  snapshot_duration,
  max_expected_downtime
) * 1.5 (safety margin)

Для 500GB table:

  • Snapshot duration: ~6 hours
  • Safety margin: 1.5x
  • Minimum: 9 hours (32400 seconds)
  • Recommended: 14+ days (1209600+ seconds)
SET GLOBAL binlog_expire_logs_seconds = 1209600;

Make persistent в my.cnf:

[mysqld]
binlog_expire_logs_seconds = 1209600

Layer 3: Regular Backups

См. секцию “Schema History Backup and Restore” ниже.

Layer 4: Monitoring

Prometheus alerting rules:

groups:
  - name: debezium_schema_history
    rules:
      - alert: SchemaHistoryRetentionIncorrect
        expr: kafka_topic_config{topic=~"schema-history.*",key="retention.ms"} != -1
        for: 5m
        annotations:
          summary: "Schema history topic has finite retention (CRITICAL)"

      - alert: SchemaHistoryMessageCountDrop
        expr: delta(kafka_log_size{topic=~"schema-history.*"}[1h]) < -1
        for: 5m
        annotations:
          summary: "Schema history topic size decreased (possible purge)"

      - alert: DebeziumOffsetAgeTooOld
        expr: (time() - debezium_metrics_LastEventTimestamp) > 432000  # 5 days
        for: 1h
        annotations:
          summary: "Debezium offset age approaching binlog retention"

Schema History Backup and Restore

Tip

Backup перед connector upgrades

Всегда создавайте backup schema history topic перед:

  • Debezium connector version upgrade
  • Major DDL migrations
  • Kafka Connect upgrade

Recovery из backup = минуты. Resnapshot после corruption = часы.

Backup Procedure

Manual backup (on-demand):

# Export schema history topic в файл
kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector \
  --from-beginning \
  --property print.key=true \
  --property print.timestamp=true \
  --timeout-ms 30000 \
  > schema-history-backup-$(date +%Y%m%d-%H%M%S).json

# Compress для экономии места
gzip schema-history-backup-*.json

Automated backup script:

#!/bin/bash
# /usr/local/bin/backup-debezium-schema-history.sh

KAFKA_BOOTSTRAP="kafka:9092"
BACKUP_DIR="/backups/debezium-schema-history"
DATE=$(date +%Y%m%d-%H%M%S)

# Список всех schema history topics
TOPICS=$(kafka-topics --bootstrap-server "$KAFKA_BOOTSTRAP" --list | grep "^schema-history\.")

mkdir -p "$BACKUP_DIR"

for TOPIC in $TOPICS; do
  echo "Backing up $TOPIC..."

  kafka-console-consumer \
    --bootstrap-server "$KAFKA_BOOTSTRAP" \
    --topic "$TOPIC" \
    --from-beginning \
    --property print.key=true \
    --property print.timestamp=true \
    --timeout-ms 30000 \
    > "$BACKUP_DIR/${TOPIC}-${DATE}.json"

  gzip "$BACKUP_DIR/${TOPIC}-${DATE}.json"

  echo "Backup saved: $BACKUP_DIR/${TOPIC}-${DATE}.json.gz"
done

# Удалить backups старше 30 дней
find "$BACKUP_DIR" -name "schema-history-*.json.gz" -mtime +30 -delete

echo "Backup completed: $(date)"

Cron schedule (daily at 2 AM):

0 2 * * * /usr/local/bin/backup-debezium-schema-history.sh >> /var/log/debezium-backup.log 2>&1

Backup frequency recommendations:

EnvironmentFrequencyRetention
Production (active DDL)Daily + before schema changes30+ days
Production (stable)Weekly60+ days
StagingBefore major changes14 days
DevBefore connector upgrades7 days

Restore Procedure

Шаг 1: Подготовить backup file

# Unzip backup
gunzip schema-history-backup-20260201-140000.json.gz

# Inspect format
head -3 schema-history-backup-20260201-140000.json

Формат backup:

CreateTime:1706745600000    null    {"ddl":"CREATE DATABASE `inventory`",...}
CreateTime:1706745601000    null    {"ddl":"CREATE TABLE `customers`",...}
CreateTime:1706745602000    null    {"ddl":"ALTER TABLE `customers`",...}

Шаг 2: Extract JSON payloads (удалить timestamps и keys)

cat schema-history-backup-20260201-140000.json | \
  awk -F'\t' '{print $3}' \
  > schema-history-payloads.json

Шаг 3: Создать schema history topic (если удалён)

kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-history.mysql-connector \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=-1 \
  --config retention.bytes=-1

Шаг 4: Restore payloads в topic

kafka-console-producer \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector \
  < schema-history-payloads.json

Шаг 5: Verify restore

# Подсчитать messages
kafka-run-class kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector

# Прочитать первое и последнее сообщение
kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector \
  --from-beginning \
  --max-messages 1

kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector \
  --partition 0 \
  --offset latest \
  --max-messages 1

Шаг 6: Restart connector

curl -X POST http://localhost:8083/connectors/mysql-connector/restart

Мониторить startup:

docker compose logs -f connect | grep mysql-connector

Ожидаемые сообщения:

INFO Reading schema history from Kafka topic schema-history.mysql-connector
INFO Schema history recovered with 15 DDL events
INFO Starting streaming from binlog position mysql-bin.000255:4567890

Recovery Time Estimates

Помогает планировать downtime для различных recovery сценариев.

Recovery MethodTime EstimateData DependencyDownside
snapshot.mode=when_neededProportional to data size
1TB = ~4-8 hours
YesMay produce duplicate events during snapshot
snapshot.mode=recoverySeconds to minutes
(reads schema only, not data)
NoDANGEROUS if DDL happened since last offset
Restore from backupSeconds to minutes
(Kafka topic restore)
NoRequires recent backup exists
Fresh connector (snapshot.mode=initial)Snapshot time + downstream reprocessing
1TB = ~4-8 hours + consumer lag
YesDuplicate events in consumers, requires idempotency
Manual offset resetMinutes (if offset position known)NoRisk of duplicate/missing events if wrong

Пример планирования для 500GB database:

  1. Best case (backup exists):

    • Restore schema history: 2 minutes
    • Connector restart: 30 seconds
    • Total downtime: ~3 minutes
  2. Moderate case (recovery mode safe):

    • DDL verification: 5 minutes
    • Recovery mode execution: 1 minute
    • Connector resume: 30 seconds
    • Total downtime: ~7 minutes
  3. Worst case (resnapshot required):

    • Snapshot 500GB: ~3-6 hours
    • Downstream consumer catch-up: ~1 hour
    • Total downtime: ~4-7 hours

Planning recommendation: Aim for best case через regular backups.

Common Mistakes During Recovery

Danger

Top 5 Recovery Mistakes

  1. Using recovery mode when DDL happened → silent data corruption
  2. Forgetting to change snapshot.mode back after recovery → connector stays in recovery mode
  3. Not verifying recovery before resuming downstream → propagating corruption
  4. Skipping backup verification → discovering backup is corrupt during emergency
  5. Deleting old connector before verifying new one works → losing offset/schema history

Mistake 1: Recovery Mode with DDL Changes

Что происходит:

-- Last offset: mysql-bin.000100, pos 5000
-- Developer выполнил DDL:
ALTER TABLE customers ADD COLUMN loyalty_points INT DEFAULT 0;

-- DBA видит schema history topic corrupted
-- Использует snapshot.mode=recovery БЕЗ проверки DDL

Последствия:

  • Connector читает current schema (с loyalty_points)
  • Применяет эту схему к событиям ДО DDL
  • События без loyalty_points интерпретируются с loyalty_points
  • Consumers получают corrupt data

Правильный подход:

Всегда проверяйте DDL history через SHOW BINLOG EVENTS (см. DDL Verification выше).

Mistake 2: Leaving Recovery Mode Active

Что происходит:

# Recovery выполнен
curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
  -d '{"snapshot.mode": "recovery"}'

# Connector restart, recovery успешна
# Администратор забывает изменить snapshot.mode обратно

# Через неделю: connector restart по другой причине
# Recovery mode triggers снова → unnecessary schema rebuild

Правильный подход:

После recovery завершён:

curl -X PUT http://localhost:8083/connectors/mysql-connector/config \
  -d '{"snapshot.mode": "when_needed"}'

Mistake 3: Not Verifying Recovery

Что происходит:

  • Recovery completed
  • Connector status: RUNNING
  • Администратор assumes всё работает
  • Downstream consumers начинают обрабатывать events
  • Hours later: обнаруживаются schema mismatch errors в consumer logs

Правильный подход:

После recovery:

  1. Verify connector streaming:

    curl http://localhost:8083/connectors/mysql-connector/status | jq .connector.state
    # Должно быть: "RUNNING"
  2. Verify events flowing:

    kafka-console-consumer \
      --bootstrap-server kafka:9092 \
      --topic mysql-server.inventory.customers \
      --max-messages 5
  3. Test schema в consumers:

    # Trigger test INSERT
    docker compose exec mysql mysql -u root -p -e "INSERT INTO inventory.customers VALUES (9999, 'Test Recovery', '[email protected]')"
    
    # Verify consumer processes it без errors
    docker compose logs -f consumer-app | grep "Test Recovery"
  4. Monitor for 1 hour перед объявлением recovery successful.

Mistake 4: Untested Backups

Что происходит:

  • Backup script runs daily
  • Backups accumulate в storage
  • Emergency happens
  • Администратор tries restore
  • Backup file corrupted / format changed / incomplete

Правильный подход:

Monthly backup restore drill:

# 1. Create test schema history topic
kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-history.test-restore \
  --partitions 1 \
  --replication-factor 1 \
  --config retention.ms=-1

# 2. Restore latest backup в test topic
cat schema-history-backup-latest.json | \
  awk -F'\t' '{print $3}' | \
  kafka-console-producer \
    --bootstrap-server kafka:9092 \
    --topic schema-history.test-restore

# 3. Verify message count
kafka-run-class kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 \
  --topic schema-history.test-restore

# 4. Spot-check content
kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic schema-history.test-restore \
  --from-beginning \
  --max-messages 10

# 5. Clean up test topic
kafka-topics --bootstrap-server kafka:9092 \
  --delete \
  --topic schema-history.test-restore

Document drill results в runbook.

Mistake 5: Deleting Before Verifying

Что происходит:

# Recovery scenario: schema history corrupted
# Администратор creates new connector mysql-connector-v2
curl -X POST http://localhost:8083/connectors -d @mysql-connector-v2.json

# Немедленно удаляет old connector
curl -X DELETE http://localhost:8083/connectors/mysql-connector

# New connector fails to start (config error)
# Old connector deleted → offset topic entry gone
# **Impossible to rollback**

Правильный подход:

  1. Create new connector (mysql-connector-v2)
  2. Verify new connector RUNNING и streaming events
  3. Monitor for 1+ hours
  4. Pause old connector (not delete)
  5. After 24 hours success: delete old connector

Это дает safety net для rollback.

Hands-on Exercise: Simulated Recovery

Практические упражнения для отработки recovery procedures в безопасной среде.

Exercise 1: Binlog Purge Recovery

Цель: Simulate binlog purge и recover с snapshot.mode=when_needed.

Шаг 1: Запустить connector в нормальном режиме

# Если connector ещё не создан
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-connector",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "mysql",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "dbz",
      "database.server.id": "184054",
      "database.server.name": "mysql-server",
      "table.include.list": "inventory.*",
      "snapshot.mode": "when_needed",
      "schema.history.internal.kafka.topic": "schema-history.mysql-connector",
      "schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
    }
  }'

Шаг 2: Создать test data

docker compose exec mysql mysql -u root -pmysql -e "
  USE inventory;
  INSERT INTO customers VALUES (100, 'Alice', '[email protected]');
  INSERT INTO customers VALUES (101, 'Bob', '[email protected]');
"

Шаг 3: Проверить current binlog position

docker compose exec mysql mysql -u root -pmysql -e "SHOW MASTER STATUS\G"

Запомните File и Position.

Шаг 4: Force binlog rotation

docker compose exec mysql mysql -u root -pmysql -e "FLUSH BINARY LOGS"

Шаг 5: Остановить connector

curl -X PUT http://localhost:8083/connectors/mysql-connector/pause

Шаг 6: Simulate binlog purge

# Проверить доступные binlog files
docker compose exec mysql mysql -u root -pmysql -e "SHOW BINARY LOGS"

# Manual purge старых файлов (оставьте только последний)
docker compose exec mysql mysql -u root -pmysql -e "PURGE BINARY LOGS TO 'mysql-bin.000005'"

Шаг 7: Resume connector (триггерит position loss)

curl -X PUT http://localhost:8083/connectors/mysql-connector/resume

Шаг 8: Observe error в logs

docker compose logs -f connect | grep -i "binlog"

Вы должны увидеть:

ERROR: Connector requires binlog file 'mysql-bin.000003', but MySQL only has mysql-bin.000005

Шаг 9: Verify snapshot.mode=when_needed triggers recovery

docker compose logs -f connect | grep -i snapshot

Вы должны увидеть:

INFO: Snapshot is needed due to unavailable binlog position
INFO: Starting snapshot for tables in database inventory

Шаг 10: Wait for snapshot completion

curl http://localhost:8083/connectors/mysql-connector/status | jq .connector.state

Когда state = “RUNNING”, snapshot завершён.

Шаг 11: Verify streaming resumed

# Insert new data
docker compose exec mysql mysql -u root -pmysql -e "
  INSERT INTO inventory.customers VALUES (102, 'Charlie', '[email protected]');
"

# Check Kafka topic
kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic mysql-server.inventory.customers \
  --from-beginning \
  --max-messages 10

Expected result: Connector recovered automatically, streaming resumed.

Exercise 2: Backup and Restore Schema History

Цель: Practice backup and restore workflow.

Шаг 1: Create backup

kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector \
  --from-beginning \
  --property print.key=true \
  --property print.timestamp=true \
  --timeout-ms 10000 \
  > /tmp/schema-history-backup-test.json

Шаг 2: Verify backup content

cat /tmp/schema-history-backup-test.json | wc -l
# Should show number of DDL events

head -3 /tmp/schema-history-backup-test.json
# Should show DDL events

Шаг 3: Pause connector

curl -X PUT http://localhost:8083/connectors/mysql-connector/pause

Шаг 4: Delete schema history topic

kafka-topics --bootstrap-server kafka:9092 \
  --delete \
  --topic schema-history.mysql-connector

Шаг 5: Recreate topic

kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-history.mysql-connector \
  --partitions 1 \
  --replication-factor 1 \
  --config retention.ms=-1 \
  --config retention.bytes=-1

Шаг 6: Restore from backup

cat /tmp/schema-history-backup-test.json | \
  awk -F'\t' '{print $3}' | \
  kafka-console-producer \
    --bootstrap-server kafka:9092 \
    --topic schema-history.mysql-connector

Шаг 7: Verify restore

kafka-run-class kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 \
  --topic schema-history.mysql-connector

Шаг 8: Resume connector

curl -X PUT http://localhost:8083/connectors/mysql-connector/resume

Шаг 9: Verify connector resumed

curl http://localhost:8083/connectors/mysql-connector/status | jq

# Should show state: "RUNNING"

Expected result: Connector resumed без errors, schema history restored.

Summary Checklist

Pre-Production Setup

  • Configure snapshot.mode=when_needed for automatic binlog position recovery
  • Set schema history topic retention: retention.ms=-1 AND retention.bytes=-1
  • Configure adequate binlog retention: binlog_expire_logs_seconds = max_downtime * safety_margin
  • Setup heartbeat events: heartbeat.interval.ms=10000
  • Configure monitoring alerts: lag, retention config, topic size
  • Setup daily backup script for schema history topic
  • Test backup restore procedure (monthly drill)
  • Document recovery procedures в runbook

During Incident

  • Identify error type from connector logs (binlog vs schema history)
  • Check available binlog files vs required position
  • Check schema history topic exists and has correct retention
  • If schema history issue: Verify no DDL since last offset
  • Choose recovery path based on diagnosis
  • Document incident timeline для post-mortem

Post-Recovery

  • Verify connector state: RUNNING
  • Test event flow: insert test data, check Kafka topics
  • Verify downstream consumers processing без errors
  • Reset snapshot.mode back to when_needed (if used recovery mode)
  • Update monitoring dashboards
  • Update alerts based on lessons learned
  • Schedule post-mortem meeting
  • Update runbook с actual recovery time

Key Takeaways

  1. Binlog position loss happens when connector offline дольше binlog_expire_logs_seconds
  2. snapshot.mode=when_needed provides automatic recovery от binlog purge (triggers snapshot)
  3. Schema history corruption happens when Kafka retention purges schema history topic
  4. snapshot.mode=recovery rebuilds schema history from current DB schema (seconds recovery)
  5. CRITICAL: Verify no DDL changes before using recovery mode → silent corruption if DDL happened
  6. Prevention cheaper than recovery: infinite retention costs MB, recovery costs hours
  7. Defense in depth: retention + backups + monitoring = 99.9% uptime
  8. Regular backups enable minutes recovery vs hours resnapshot
  9. Always verify recovery перед resuming downstream consumers
  10. Test recovery procedures monthly в staging environment
  11. Document runbooks с step-by-step procedures для on-call engineers
  12. when_needed may produce duplicates during snapshot → consumers must be idempotent

Что дальше?

Мы завершили изучение recovery procedures для MySQL CDC. Теперь вы знаете:

  • Как диагностировать binlog position loss vs schema history corruption
  • Step-by-step recovery workflows для обоих сценариев
  • Prevention strategies в формате defense-in-depth
  • Backup and restore procedures для fast recovery

Module 8 Complete! Вы теперь понимаете:

  • MySQL binlog architecture и форматы (Lesson 01)
  • GTID mode для failover resilience (Lesson 02)
  • Binlog retention и heartbeat (Lesson 03)
  • Connector configuration (Lesson 04)
  • PostgreSQL vs MySQL comparison (Lesson 05)
  • Schema history topic internals (Lesson 06)
  • Aurora-specific configuration (Lessons 07-09)
  • Monitoring и lag metrics (Lesson 10)
  • GTID failover procedures (Lesson 11)
  • Incremental snapshots (Lesson 12)
  • Recovery procedures (Lesson 13) ← We are here

Next Module: Advanced Topics — Multi-connector deployments, online DDL integration (gh-ost/pt-osc), и GTID migration strategies.

Вы готовы к production MySQL CDC deployments с confidence в recovery capabilities.

Check Your Understanding

Score: 0 of 0
Applied
Question 1 of 4. Коннектор MySQL CDC падает при старте с ошибкой: "The connector is trying to read binlog starting at binlog file 'mysql-bin.000134', but this file no longer exists on the server". Какова корневая причина этой ошибки?

Finished the lesson?

Mark it as complete to track your progress