Skip to content
Learning Platform
Advanced
30 minutes
mysql debezium schema-history recovery kafka

Prerequisites:

  • module-8/05-binlog-wal-comparison

Schema History Topic: Критический Компонент Recovery

Если вы прошли Модуль 2, вы знаете, что PostgreSQL connector не требует дополнительной конфигурации для хранения схем — все метаданные о структуре таблиц встроены в WAL события. MySQL работает иначе.

Schema history topic — это специальный Kafka топик, где Debezium MySQL connector хранит всю историю DDL-изменений (CREATE TABLE, ALTER TABLE) с привязкой к binlog позициям. Это память коннектора, без которой он не может восстановиться после перезапуска.

В этом уроке мы погрузимся в архитектуру schema history, разберём критические требования к retention, изучим сценарии восстановления при потере данных и научимся мониторить здоровье этого топика.

Note

PostgreSQL vs MySQL: PostgreSQL не нуждается в schema history topic, потому что logical decoding события содержат полную информацию о структуре таблиц. MySQL binlog содержит только TABLE_MAP события с table_id — коннектор должен самостоятельно восстанавливать схему.

Почему MySQL нуждается в Schema History?

Разберём фундаментальное различие между PostgreSQL и MySQL CDC.

PostgreSQL: Схема в каждом событии

// PostgreSQL WAL событие (через pgoutput)
{
  "schema": {
    "type": "struct",
    "fields": [
      {"field": "id", "type": "int64"},
      {"field": "name", "type": "string"},
      {"field": "email", "type": "string"}
    ]
  },
  "payload": {
    "before": null,
    "after": {"id": 1, "name": "Alice", "email": "[email protected]"}
  }
}

Каждое событие содержит полную информацию о схеме. Даже если connector перезапустится, он может декодировать события без дополнительного контекста.

MySQL: TABLE_MAP только с table_id

// MySQL binlog событие (ROW формат)
TABLE_MAP_EVENT:
  table_id: 108
  database: inventory
  table: customers
  (схема НЕ включена)

WRITE_ROWS_EVENT:
  table_id: 108
  row_data: [1, "Alice", "[email protected]"]

Проблема: WRITE_ROWS_EVENT содержит только сырые байты данных и table_id. Чтобы интерпретировать эти байты, коннектор должен знать:

  • Сколько колонок в таблице?
  • Какие типы данных?
  • Какие имена колонок?

Откуда коннектор получает эту информацию? Из schema history topic.

Schema History Topic Flow
DDL Event
binlog
Debezium Connector
records
Schema History Topic
stores
Stored Schema
Содержимое Schema History Topic
positionfile:pos, gtid
ddlSQL statement
tableChangescolumns, types
sourcemetadata
Критическое требование
retention.ms = -1 (бесконечный)
Без infinite retention через 7 дней recovery fails.
Единственное решение — полный resnapshot базы данных.

Вывод: Schema history topic — это память коннектора о DDL-изменениях. Без него MySQL connector не может восстановиться после перезапуска.

Как работает Schema History?

Разберём жизненный цикл schema history от initial snapshot до recovery.

Фаза 1: Initial Snapshot

При первом запуске коннектор делает полный снимок таблиц и записывает их DDL в schema history topic.

-- Текущая схема базы данных
CREATE TABLE customers (
  id INT PRIMARY KEY,
  name VARCHAR(255),
  email VARCHAR(255)
);

CREATE TABLE orders (
  id INT PRIMARY KEY,
  customer_id INT,
  created_at DATETIME
);

Что записывается в schema history topic:

// Сообщение 1: Database structure
{
  "position": {"file": "mysql-bin.000001", "pos": 154},
  "databaseName": "inventory",
  "ddl": "CREATE DATABASE IF NOT EXISTS `inventory`",
  "timestamp": 1706745600000
}

// Сообщение 2: Table structure
{
  "position": {"file": "mysql-bin.000001", "pos": 234},
  "databaseName": "inventory",
  "ddl": "CREATE TABLE `customers` (
    `id` int NOT NULL,
    `name` varchar(255),
    `email` varchar(255),
    PRIMARY KEY (`id`)
  )",
  "timestamp": 1706745601000
}

// Сообщение 3: Another table
{
  "position": {"file": "mysql-bin.000001", "pos": 456},
  "databaseName": "inventory",
  "ddl": "CREATE TABLE `orders` (...)",
  "timestamp": 1706745602000
}

Фаза 2: Streaming Changes (DDL записывается автоматически)

Когда приложение выполняет DDL операцию, Debezium автоматически записывает её в schema history.

-- DBA выполняет миграцию
ALTER TABLE customers ADD COLUMN phone VARCHAR(20);

Debezium детектирует DDL из binlog:

QUERY_EVENT:
  database: inventory
  query: ALTER TABLE customers ADD COLUMN phone VARCHAR(20)
  binlog_position: (mysql-bin.000003, 1024)

Запись в schema history topic:

{
  "position": {"file": "mysql-bin.000003", "pos": 1024},
  "databaseName": "inventory",
  "ddl": "ALTER TABLE `customers` ADD COLUMN `phone` varchar(20)",
  "timestamp": 1706755600000
}

Фаза 3: Connector Restart (Replay DDL History)

При перезапуске коннектор должен восстановить схему всех таблиц на момент сохранённого offset.

Schema Recovery Decision Tree
Connector Restart
Read Saved Offset
Read Schema History Topic
DDL History Complete?
YES
Apply DDL Sequentially
Resume CDC
NO
Schema History Missing
Resnapshot Required
Normal Recovery
1-5 мин
Чтение schema history + resume CDC
Resnapshot Recovery
2-4 часа
Для 100GB таблицы (зависит от размера)

Ключевой момент: Коннектор читает schema history topic от начала до конца, применяя DDL последовательно до сохранённого binlog offset. Это даёт ему точное состояние схемы на момент остановки.

Schema History Topic Configuration

Теперь разберём, как правильно настроить schema history topic для production.

Обязательные параметры

{
  "name": "mysql-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",

    // Обязательно: имя топика для schema history
    "schema.history.internal.kafka.topic": "schema-changes.mysql-inventory",

    // Обязательно: Kafka bootstrap servers
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",

    // Остальная конфигурация...
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "table.include.list": "inventory.*"
  }
}

Важно:

  • Используйте уникальное имя топика для каждого коннектора
  • Если коннекторов несколько, НЕ используйте общий schema history topic

Опциональные параметры (recovery tuning)

{
  "config": {
    // ... обязательные параметры ...

    // Попытки восстановления при старте (default: 4)
    "schema.history.internal.kafka.recovery.attempts": "10",

    // Интервал между попытками (default: 500ms)
    "schema.history.internal.kafka.recovery.poll.interval.ms": "1000",

    // Timeout для чтения истории (default: 3000ms)
    "schema.history.internal.consumer.sasl.login.refresh.timeout.ms": "10000"
  }
}

Когда нужны эти параметры:

  • Если Kafka cluster временно недоступен при старте коннектора
  • Если schema history topic большой и требует времени на чтение
  • Для более надёжного recovery в нестабильной сети
Tip

Naming convention: Используйте префикс schema-changes.{connector-name} для ясности. Примеры: schema-changes.mysql-inventory, schema-changes.aurora-production.

КРИТИЧНО: Retention Configuration

Это самая частая причина катастрофических сбоев Debezium MySQL connector в production.

Danger

7-дневная бомба замедленного действия

По умолчанию Confluent Kafka использует log.retention.ms=604800000 (7 дней). Если вы не изменили retention для schema history topic, через 7 дней старые DDL записи будут удалены.

Что произойдёт:

  • Коннектор работает нормально первые 7 дней
  • На 8-й день retention policy удаляет старые сообщения
  • Коннектор перезапускается (deploy, failover, restart)
  • ОШИБКА: “The db history topic or its content is fully or partially missing”
  • Коннектор НЕ может восстановиться — требуется полный resnapshot

Правильная конфигурация: Infinite Retention

Schema history topic ВСЕГДА должен иметь бесконечный retention.

# Создайте schema history topic ДО деплоя коннектора
kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-changes.mysql-inventory \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=-1 \
  --config retention.bytes=-1

# Проверьте настройки
kafka-topics --bootstrap-server kafka:9092 \
  --describe \
  --topic schema-changes.mysql-inventory

Ожидаемый вывод:

Topic: schema-changes.mysql-inventory
PartitionCount: 1
ReplicationFactor: 3
Configs: retention.ms=-1,retention.bytes=-1

Если топик уже существует с неправильным retention

# Изменить retention на существующем топике
kafka-configs --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name schema-changes.mysql-inventory \
  --alter \
  --add-config retention.ms=-1,retention.bytes=-1

# Проверить изменения
kafka-configs --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name schema-changes.mysql-inventory \
  --describe
Warning

Если retention уже удалил сообщения, изменение конфигурации НЕ вернёт их обратно. Потребуется resnapshot.

Проверка знаний
Как коннектор Debezium использует schema history topic при перезапуске?
Ответ
При перезапуске коннектор читает schema history topic от начала до конца, последовательно применяя все DDL-операции (CREATE TABLE, ALTER TABLE) до сохранённого binlog offset. Это позволяет восстановить точное состояние схемы всех таблиц на момент остановки.

Topic Requirements (Anti-Patterns)

Schema history topic имеет строгие требования к конфигурации.

ТребованиеЗначениеПочемуЧто будет, если нарушить
Partitions1 (ОДНА партиция)DDL события должны читаться в строгом порядкеСхема будет применяться в неправильном порядке → corruption
Retention time-1 (бесконечный)Коннектор читает ВСЮ историю от началаRecovery fails после 7 дней (default retention)
Retention bytes-1 (бесконечный)Топик может вырасти до нескольких MBTopic purge → recovery fails
Cleanup policydelete (НЕ compact)Нужна полная история, не последнее состояниеCompaction удалит промежуточные DDL → schema mismatch
Replication factor≥ 3 (production)Отказоустойчивость топикаПотеря broker → потеря истории → resnapshot

Команда для создания с правильными настройками:

kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-changes.mysql-inventory \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=-1 \
  --config retention.bytes=-1 \
  --config cleanup.policy=delete \
  --config min.insync.replicas=2
Danger

Anti-pattern: Shared Schema History Topic

Никогда не используйте один schema history topic для нескольких коннекторов. Каждый коннектор должен иметь свой уникальный топик.

Почему: Коннектор A прочитает DDL от коннектора B → schema mismatch → corrupt events.

Что хранится в Schema History Topic?

Разберём внутренний формат записей для понимания механизма recovery.

Формат сообщения

Каждая DDL операция записывается как JSON сообщение (не Avro).

{
  "source": {
    "version": "2.5.0.Final",
    "connector": "mysql",
    "name": "mysql-server",
    "ts_ms": 1706745600000,
    "snapshot": "false",
    "db": "inventory",
    "table": "customers"
  },
  "position": {
    "transaction_id": null,
    "ts_sec": 1706745600,
    "file": "mysql-bin.000003",
    "pos": 1024,
    "gtids": "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-150",
    "server_id": 1
  },
  "databaseName": "inventory",
  "schemaName": null,
  "ddl": "ALTER TABLE `customers` ADD COLUMN `phone` varchar(20) AFTER `email`",
  "tableChanges": [
    {
      "type": "ALTER",
      "id": "\"inventory\".\"customers\"",
      "table": {
        "defaultCharsetName": "utf8mb4",
        "primaryKeyColumnNames": ["id"],
        "columns": [
          {"name": "id", "jdbcType": 4, "typeName": "INT"},
          {"name": "name", "jdbcType": 12, "typeName": "VARCHAR", "length": 255},
          {"name": "email", "jdbcType": 12, "typeName": "VARCHAR", "length": 255},
          {"name": "phone", "jdbcType": 12, "typeName": "VARCHAR", "length": 20}
        ]
      }
    }
  ]
}

Ключевые поля:

  1. position: Binlog позиция, на которой произошло DDL

    • file + pos — для file:offset tracking
    • gtids — для GTID tracking
  2. ddl: Полный SQL текст DDL операции

    • Используется для человека (логирование, debugging)
  3. tableChanges: Структурированное описание схемы

    • Используется Debezium для декодирования row events
    • Содержит полные метаданные колонок (типы, длина, constraints)

Пример последовательности событий

# Читаем schema history topic
kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic schema-changes.mysql-inventory \
  --from-beginning \
  --property print.key=true

Вывод (упрощённо):

null    {"ddl":"CREATE DATABASE `inventory`","position":{"file":"mysql-bin.000001","pos":154}}
null    {"ddl":"CREATE TABLE `customers` (id INT, name VARCHAR(255))","position":{"file":"mysql-bin.000001","pos":234}}
null    {"ddl":"CREATE TABLE `orders` (...)","position":{"file":"mysql-bin.000001","pos":456}}
null    {"ddl":"ALTER TABLE `customers` ADD COLUMN `email` VARCHAR(255)","position":{"file":"mysql-bin.000002","pos":789}}
null    {"ddl":"ALTER TABLE `customers` ADD COLUMN `phone` VARCHAR(20)","position":{"file":"mysql-bin.000003","pos":1024}}

При restart коннектор:

  1. Читает все 5 сообщений от начала
  2. Применяет DDL последовательно
  3. Получает текущую схему: customers (id, name, email, phone)
  4. Возобновляет чтение binlog с сохранённого offset

Recovery Scenarios

Разберём типичные сценарии сбоев и процедуры восстановления.

Scenario A: Normal Connector Restart ✅

Что происходит:

  • Connector останавливается (deploy, reschedule, config update)
  • Connector стартует заново

Процесс recovery:

Причины повреждения Schema History
!Manual Editing
!Network Failure
!Shared Topic
Recovery Flow при Corruption
Corruption Detected
Step 1
Stop Connector
Step 2
Delete Corrupted Topic
Step 3
Create New Topic
Step 4
Recreate Connector
Предотвращение Corruption
Unique topic per connector
Infinite retention (-1)
Regular backups

Действия администратора: Никаких. Recovery автоматический.

Время recovery: Секунды (зависит от размера schema history topic)


Scenario B: Schema History Topic Partially Purged ⚠️

Симптом:

ERROR: The db history topic 'schema-changes.mysql-inventory' is fully or partially missing.

Причина:

  • Default Kafka retention (7 дней) удалил старые DDL записи
  • Connector не может восстановить схему с initial snapshot

Диагностика:

# Проверить retention settings
kafka-configs --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name schema-changes.mysql-inventory \
  --describe

# Если retention.ms != -1, это причина

Recovery Option 1: Resnapshot (МЕДЛЕННО)

Warning

Resnapshot может занять часы для больших таблиц

Для базы данных с таблицей на 100GB, initial snapshot может выполняться 2-4 часа. Учитывайте downtime.

# 1. Удалить коннектор
curl -X DELETE http://localhost:8083/connectors/mysql-inventory-connector

# 2. Удалить schema history topic
kafka-topics --bootstrap-server kafka:9092 \
  --delete \
  --topic schema-changes.mysql-inventory

# 3. Создать топик с правильным retention
kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-changes.mysql-inventory \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=-1 \
  --config retention.bytes=-1

# 4. Удалить Kafka Connect offsets (опционально)
# Это заставит коннектор начать с snapshot.mode=initial
kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic connect-offsets \
  --from-beginning \
  --property print.key=true | grep mysql-inventory-connector
# (manually delete offset records via Kafka Connect REST API)

# 5. Пересоздать коннектор с snapshot.mode=initial
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-inventory-connector",
    "config": {
      "snapshot.mode": "initial",
      ...
    }
  }'

Recovery Option 2: Restore from Backup (БЫСТРО)

Если у вас есть backup schema history topic (см. секцию “Backup Procedures”):

# 1. Удалить повреждённый топик
kafka-topics --bootstrap-server kafka:9092 \
  --delete \
  --topic schema-changes.mysql-inventory

# 2. Создать новый топик
kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-changes.mysql-inventory \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=-1

# 3. Восстановить из backup
kafka-console-producer --bootstrap-server kafka:9092 \
  --topic schema-changes.mysql-inventory \
  < schema-history-backup-2026-01-31.json

# 4. Перезапустить коннектор
curl -X POST http://localhost:8083/connectors/mysql-inventory-connector/restart

Время recovery: 1-5 минут (vs часы для resnapshot)


Scenario C: Schema History Topic Corrupted 💥

Симптом:

  • Events содержат неправильные колонки
  • Schema parsing errors в логах Debezium
  • Events с данными, не соответствующими текущей схеме
ERROR: Column count doesn't match value count at row 1
ERROR: Unknown column 'old_column_name' in field list

Причина:

  • Ручное редактирование schema history topic
  • Сетевые сбои во время записи DDL
  • Kafka broker corruption

Диагностика:

# Прочитать schema history topic
kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic schema-changes.mysql-inventory \
  --from-beginning

# Проверить на:
# - Неполные JSON сообщения
# - Сообщения с неправильными binlog positions
# - Дублирующиеся DDL операции

Recovery (ТОЛЬКО resnapshot):

# 1. Остановить коннектор
curl -X DELETE http://localhost:8083/connectors/mysql-inventory-connector

# 2. Удалить повреждённый schema history topic
kafka-topics --bootstrap-server kafka:9092 \
  --delete \
  --topic schema-changes.mysql-inventory

# 3. Создать новый топик с правильной конфигурацией
kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-changes.mysql-inventory \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=-1 \
  --config retention.bytes=-1

# 4. Пересоздать коннектор с snapshot.mode=initial
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @mysql-connector-config.json

Scenario D: Multiple Connectors Sharing Topic 🚫

Danger

Anti-pattern: Shared schema history topic

Это критическая ошибка конфигурации, которая приводит к непредсказуемым сбоям.

Симптом:

  • Random schema errors (временные, непостоянные)
  • Connector A видит DDL от Connector B
  • Events с колонками, которые не существуют в таблице
ERROR: Column 'customer_id' doesn't exist in table 'products'
(Connector A прочитал DDL от Connector B, который работает с другой базой)

Причина:

  • Два коннектора используют одинаковое имя schema history topic
  • Или используется topic.prefix без уникального schema history topic

Неправильная конфигурация:

// Connector A
{
  "name": "mysql-connector-a",
  "config": {
    "schema.history.internal.kafka.topic": "schema-changes.shared"  // ❌ WRONG
  }
}

// Connector B
{
  "name": "mysql-connector-b",
  "config": {
    "schema.history.internal.kafka.topic": "schema-changes.shared"  // ❌ WRONG
  }
}

Правильная конфигурация:

// Connector A
{
  "name": "mysql-connector-a",
  "config": {
    "schema.history.internal.kafka.topic": "schema-changes.mysql-connector-a"  // ✅ UNIQUE
  }
}

// Connector B
{
  "name": "mysql-connector-b",
  "config": {
    "schema.history.internal.kafka.topic": "schema-changes.mysql-connector-b"  // ✅ UNIQUE
  }
}

Recovery:

# 1. Остановить оба коннектора
curl -X DELETE http://localhost:8083/connectors/mysql-connector-a
curl -X DELETE http://localhost:8083/connectors/mysql-connector-b

# 2. Удалить shared schema history topic
kafka-topics --bootstrap-server kafka:9092 \
  --delete \
  --topic schema-changes.shared

# 3. Создать уникальные топики для каждого коннектора
kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-changes.mysql-connector-a \
  --partitions 1 --replication-factor 3 --config retention.ms=-1

kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-changes.mysql-connector-b \
  --partitions 1 --replication-factor 3 --config retention.ms=-1

# 4. Пересоздать коннекторы с правильными настройками и snapshot.mode=initial
curl -X POST http://localhost:8083/connectors -d @connector-a-config.json
curl -X POST http://localhost:8083/connectors -d @connector-b-config.json

Monitoring Schema History Health

Проактивный мониторинг schema history topic предотвращает катастрофы.

Метрики для отслеживания

МетрикаЧто мониторитьНормальное поведениеAlert threshold
Topic sizeРазмер топика в MBРастёт со временем (DDL добавляются)Внезапное падение до 0
Message countКоличество DDL сообщенийМедленный рост (редкие DDL)Внезапное уменьшение
Consumer lagКоннектор читает топик0 (коннектор читает при старте)Lag > 0 при старте
Retention configretention.ms-1 (infinite)Любое значение ≠ -1

Kafka Commands для мониторинга

# 1. Проверить retention settings
kafka-configs --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name schema-changes.mysql-inventory \
  --describe | grep retention

# Ожидается: retention.ms=-1, retention.bytes=-1

# 2. Проверить размер топика
kafka-log-dirs --bootstrap-server kafka:9092 \
  --topic-list schema-changes.mysql-inventory \
  --describe | grep size

# 3. Подсчитать сообщения в топике
kafka-run-class kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 \
  --topic schema-changes.mysql-inventory | \
  awk -F: '{sum += $3} END {print "Total messages:", sum}'

# 4. Проверить, что топик не compacted
kafka-topics --bootstrap-server kafka:9092 \
  --describe \
  --topic schema-changes.mysql-inventory | grep cleanup.policy

# Ожидается: cleanup.policy=delete (NOT compact)

Alerts для настройки

# Prometheus AlertManager rule (пример)
groups:
  - name: debezium_schema_history
    rules:
      - alert: SchemaHistoryRetentionIncorrect
        expr: |
          kafka_topic_config{topic=~"schema-changes.*",key="retention.ms"} != -1
        for: 5m
        annotations:
          summary: "Schema history topic has finite retention (CRITICAL)"

      - alert: SchemaHistoryTopicSizeDecreased
        expr: |
          delta(kafka_log_size{topic=~"schema-changes.*"}[1h]) < 0
        for: 5m
        annotations:
          summary: "Schema history topic size decreased (possible purge)"

      - alert: SchemaHistoryPartitionCountWrong
        expr: |
          kafka_topic_partitions{topic=~"schema-changes.*"} != 1
        for: 5m
        annotations:
          summary: "Schema history topic has multiple partitions (MISCONFIGURED)"
Tip

Proactive monitoring saves hours of downtime

Настройте alert на retention.ms != -1 в первый же день. Это предотвратит 7-дневную бомбу.

Backup and Restore Procedures

Backup schema history topic значительно ускоряет recovery.

Почему backup критичен?

Сценарий без backup:

  • Schema history topic purged или corrupted
  • Единственный вариант: full resnapshot
  • Для 100GB таблицы: 2-4 часа downtime

Сценарий с backup:

  • Restore schema history topic из backup
  • Restart коннектора
  • Recovery за 1-5 минут

Как создать backup

# Экспорт schema history topic в файл
kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic schema-changes.mysql-inventory \
  --from-beginning \
  --property print.key=true \
  --property print.timestamp=true \
  --timeout-ms 10000 \
  > schema-history-backup-$(date +%Y-%m-%d).json

# Пример вывода файла:
# CreateTime:1706745600000    null    {"ddl":"CREATE TABLE ...","position":{...}}
# CreateTime:1706745601000    null    {"ddl":"ALTER TABLE ...","position":{...}}

Рекомендация по частоте backup:

  • Daily backup для production коннекторов
  • После каждого DDL изменения (для критических систем)
  • Retention: Храните backups минимум 30 дней

Автоматизация с cron:

#!/bin/bash
# /usr/local/bin/backup-schema-history.sh

KAFKA_BROKER="kafka:9092"
TOPIC="schema-changes.mysql-inventory"
BACKUP_DIR="/backups/debezium-schema-history"
DATE=$(date +%Y-%m-%d)

mkdir -p "$BACKUP_DIR"

kafka-console-consumer --bootstrap-server "$KAFKA_BROKER" \
  --topic "$TOPIC" \
  --from-beginning \
  --property print.key=true \
  --timeout-ms 10000 \
  > "$BACKUP_DIR/schema-history-$DATE.json"

# Compress старых backups (опционально)
gzip "$BACKUP_DIR/schema-history-$DATE.json"

# Удалить backups старше 30 дней
find "$BACKUP_DIR" -name "schema-history-*.json.gz" -mtime +30 -delete

Cron schedule (ежедневно в 2 AM):

0 2 * * * /usr/local/bin/backup-schema-history.sh

Как восстановить из backup

# 1. Создать новый schema history topic (если удалён)
kafka-topics --bootstrap-server kafka:9092 \
  --create \
  --topic schema-changes.mysql-inventory \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=-1 \
  --config retention.bytes=-1

# 2. Распаковать backup (если gzipped)
gunzip schema-history-backup-2026-01-31.json.gz

# 3. Восстановить сообщения в топик
# ВАЖНО: Удалите timestamps и keys из backup файла перед restore
cat schema-history-backup-2026-01-31.json | \
  awk -F'\t' '{print $3}' | \
  kafka-console-producer --bootstrap-server kafka:9092 \
    --topic schema-changes.mysql-inventory

# 4. Проверить, что сообщения загружены
kafka-run-class kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 \
  --topic schema-changes.mysql-inventory

# 5. Перезапустить коннектор
curl -X POST http://localhost:8083/connectors/mysql-inventory-connector/restart
Warning

Restore must use exact same topic name

Schema history topic name hardcoded в коннекторе. Если вы restore в топик с другим именем, коннектор не найдёт историю.

Проверка знаний
Почему нельзя использовать один schema history topic для нескольких Debezium коннекторов?
Ответ
Каждый коннектор записывает DDL-операции своих таблиц в schema history topic. Если два коннектора используют общий topic, коннектор A прочитает DDL от коннектора B при восстановлении, что приведёт к несоответствию схем (schema mismatch) и повреждению событий.

Schema Evolution Best Practices

Как безопасно выполнять DDL изменения в production CDC пайплайне.

Compatible Schema Changes (Seamless)

Эти DDL операции НЕ ломают CDC пайплайн:

-- ✅ ADD COLUMN (nullable или с default)
ALTER TABLE customers ADD COLUMN phone VARCHAR(20);
ALTER TABLE customers ADD COLUMN created_at DATETIME DEFAULT CURRENT_TIMESTAMP;

-- ✅ ADD INDEX (не влияет на CDC)
CREATE INDEX idx_email ON customers(email);

-- ✅ RENAME COLUMN (Debezium видит новое имя)
ALTER TABLE customers RENAME COLUMN name TO full_name;

Что происходит:

  1. MySQL выполняет DDL
  2. Debezium записывает DDL в schema history topic
  3. Последующие события используют новую схему
  4. Consumers получают events с новыми колонками

No downtime, no coordination needed.

Incompatible Schema Changes (Requires Coordination)

Эти DDL операции могут сломать consumers:

-- ⚠️ DROP COLUMN
ALTER TABLE customers DROP COLUMN email;

-- ⚠️ CHANGE COLUMN TYPE
ALTER TABLE customers MODIFY COLUMN id BIGINT;

-- ⚠️ RENAME TABLE
RENAME TABLE customers TO users;

Проблема: Consumers ожидают старую схему.

Best practice workflow:

1. Deploy new consumer version (supports both old and new schema)
2. Wait for all consumers to upgrade
3. Execute DDL in MySQL
4. Debezium automatically propagates schema change
5. Old consumer version gracefully handles missing column (if coded defensively)
6. After migration period, deploy final consumer version (only new schema)
Tip

Test schema changes in staging first

Всегда тестируйте DDL миграции в staging environment с полным CDC пайплайном. Это выявит проблемы совместимости до production deploy.

Handling Large DDL Operations

-- Операция, которая может занять минуты/часы
ALTER TABLE large_table ADD COLUMN metadata JSON;

Проблемы:

  • MySQL блокирует таблицу на время DDL (зависит от версии и типа операции)
  • Debezium может видеть timeout если операция слишком долгая
  • Binlog position не двигается во время DDL

Рекомендации:

  1. Используйте Online DDL (MySQL 8.0+)

    ALTER TABLE large_table
      ADD COLUMN metadata JSON,
      ALGORITHM=INPLACE, LOCK=NONE;
  2. Для старых MySQL версий: pt-online-schema-change

    pt-online-schema-change --alter "ADD COLUMN metadata JSON" \
      D=inventory,t=large_table \
      --execute
  3. Monitor Debezium during DDL

    • Watch connector status: curl http://localhost:8083/connectors/mysql-inventory-connector/status
    • Check for timeout errors in logs

Ключевые выводы

  1. Schema history topic — это память MySQL CDC коннектора — без него невозможен recovery после restart
  2. PostgreSQL не нуждается в schema history — WAL события self-contained, MySQL binlog требует TABLE_MAP reconstruction
  3. Retention MUST be infiniteretention.ms=-1 и retention.bytes=-1 обязательны, иначе 7-дневная бомба
  4. Single partition required — DDL события должны применяться в строгом порядке
  5. Unique topic per connector — sharing schema history topic между коннекторами ведёт к corruption
  6. Backup schema history topic — restore за минуты vs resnapshot за часы
  7. Monitor retention config — alert на retention.ms != -1 предотвращает катастрофы
  8. Compatible DDL seamless — ADD COLUMN, ADD INDEX автоматически propagate
  9. Incompatible DDL requires coordination — DROP COLUMN, RENAME TABLE нужна синхронизация с consumers
  10. Test DDL changes in staging — полный CDC пайплайн должен быть протестирован до production
  11. Never manually edit schema history topic — corruption ведёт к mandatory resnapshot
  12. Schema history NOT compactedcleanup.policy=delete, нужна полная история

Что дальше?

Мы разобрали критическую роль schema history topic для MySQL CDC recovery. Теперь вы знаете, как правильно настроить retention, восстановиться из backup и мониторить здоровье топика.

В следующих уроках Модуля 8 мы перейдём к AWS Aurora MySQL — managed service с собственными особенностями CDC:

  • Параметр-группы Aurora для binlog конфигурации
  • Ограничения привилегий в RDS/Aurora
  • Read replica поддержка для CDC
  • Aurora-specific monitoring и troubleshooting

Aurora MySQL использует те же принципы schema history, но имеет уникальные ограничения managed services. Prepare to dive deep.

Check Your Understanding

Score: 0 of 0
Conceptual
Question 1 of 4. Почему MySQL коннектору Debezium необходим schema history topic, тогда как PostgreSQL коннектор обходится без него?

Finished the lesson?

Mark it as complete to track your progress