Skip to content
Learning Platform
Advanced
35 minutes
debezium mysql aurora snapshot signal-table incremental

Prerequisites:

  • module-8/09-aurora-snapshot-modes

Incremental Snapshots через Signal Table

Когда Initial Snapshot недостаточно

В предыдущих уроках мы изучили initial snapshot — механизм первоначальной загрузки данных при старте Debezium connector. Этот подход работает отлично для первого запуска, но не подходит для многих production сценариев:

Ограничения Initial Snapshot

Проблема 1: Требует перезапуска connector

  • Initial snapshot выполняется только при первом запуске (snapshot.mode=initial)
  • Чтобы resnapshot таблицу, нужно остановить и удалить connector, потерять offset
  • Downtime для всего CDC pipeline на время resnapshot

Проблема 2: All-or-nothing подход

  • Snapshot захватывает все таблицы из table.include.list
  • Нельзя resnapshot одну конкретную таблицу, не затрагивая остальные
  • Если таблица A нуждается в resnapshot, придется resnapshot и таблицы B, C, D

Проблема 3: Невозможность snapshot новых таблиц

  • Добавление таблицы в table.include.list требует connector restart
  • Restart триггерит snapshot всех таблиц, включая те, что уже были synced

Реальные сценарии, требующие on-demand snapshot

Сценарий 1: Добавление новой таблицы

Timeline:
- Day 1: Connector запущен с таблицами customers, orders
- Day 30: Разработчики создали новую таблицу products
- Task: Добавить products к CDC без resnapshot customers, orders

Сценарий 2: Восстановление после extended downtime

Scenario:
- Connector был остановлен на 10 дней (technical issue)
- Binlog retention = 7 дней (168 часов)
- Результат: Binlog files для дней 1-3 purged
- Task: Resnapshot данные, потерянные из-за purge

Сценарий 3: Data inconsistency recovery

Issue:
- Consumer application имел bug
- Неправильно обработал UPDATE события для таблицы orders
- Target database (Elasticsearch, PostgreSQL) содержит corrupted данные
- Task: Resnapshot orders для восстановления consistency

Incremental snapshot решает все эти проблемы.

Initial vs Incremental Snapshot: Архитектурное сравнение

Incremental Snapshot: Non-Blocking Flow
Signal Received
execute-snapshot
trigger
Start Snapshot
Select Chunk Range
WHERE id >= N AND id < N+2048
Read Chunk
2048 rows
Emit Chunk Events
op='r'
+
Stream Binlog
op='c/u/d'
Next Chunk
id >= 2049
repeat
Snapshot Complete
Ключевое преимущество: Параллельная обработкаRecommended
НЕ БЛОКИРУЕТ:Streaming и snapshot работают параллельно.
Initial snapshot блокирует binlog streaming до завершения. Incremental snapshot — нет.
Consumer различает события по полю source.snapshot:
"incremental" для snapshot, "false" для streaming.
Initial Snapshot
  • - Блокирует binlog streaming
  • - Все таблицы сразу
  • - Не resumable при crash
  • - Требует restart connector
Incremental Snapshot
  • + Параллельно с streaming
  • + Конкретные таблицы on-demand
  • + Resumable при crash
  • + Через signal table (без restart)

Ключевые отличия

ХарактеристикаInitial SnapshotIncremental Snapshot
ТриггерConnector start (snapshot.mode=initial)SQL INSERT в signal table (on-demand)
Connector restart✅ Требуется❌ Не требуется
ScopeВсе таблицы из table.include.listКонкретные таблицы (указываются в signal)
Метод чтенияFull table scan за одну транзакциюChunk-based (default: 2048 rows/chunk)
Параллельность с CDC❌ Binlog streaming blocked до завершения snapshot✅ Binlog streaming продолжается параллельно
LocksTable locks (minimal mode) или no locks (none mode)No locks (chunk reading через SELECT with PK range)
Resume after failure❌ Restart from scratch✅ Resume from last completed chunk
Use CaseПервый запуск connectorOn-demand resnapshot, добавление таблиц, recovery

Почему incremental snapshot называется “incremental”?

  • Данные читаются chunk by chunk (incremental progress)
  • Каждый chunk обрабатывается независимо, между chunks Debezium обрабатывает binlog events
  • Resumable: Если snapshot прерывается, Debezium знает, какие chunks уже прочитаны
Проверка знаний
В чём ключевое архитектурное отличие incremental snapshot от initial snapshot в Debezium?
Ответ
Initial snapshot блокирует binlog streaming до завершения, требует перезапуска connector и захватывает все таблицы целиком. Incremental snapshot читает данные порциями (chunks по 2048 строк), между chunks обрабатывает binlog events параллельно, не требует перезапуска connector, позволяет выбрать конкретные таблицы и поддерживает возобновление после сбоя.

Signal Table: Механизм управления Debezium

Incremental snapshot триггируется через signal table — специальную служебную таблицу, которую Debezium мониторит для получения команд.

Архитектура Signal Table

Signal Table Schema
debezium_signal
CREATE TABLE debezium_signal (
id VARCHAR(36) NOT NULL,-- UUID
type VARCHAR(32) NOT NULL,-- execute-snapshot
data TEXT,-- JSON payload
PRIMARY KEY (id)-- CRITICAL!
);
Signal Mechanism
INSERT signal
Engineer
binlog
Binlog Event
MySQL
parse
Execute Snapshot
Debezium
Execute Snapshot
INSERT INTO debezium_signal
(id, type, data) VALUES (
UUID(),
'execute-snapshot',
'{"data-collections":
  ["inventory.orders"]}'
);
Stop Snapshot
INSERT INTO debezium_signal
(id, type, data) VALUES (
UUID(),
'stop-snapshot',
'{"data-collections":
  ["inventory.orders"],
  "type":"INCREMENTAL"}'
);
CRITICAL: PRIMARY KEY обязателен
БЕЗ PRIMARY KEY:Signal table не работает (silent failure).
Debezium использует PK для парсинга binlog events.
Без PK: INSERT event не распознается как signal → snapshot не запускается → нет ошибки в логах.

Проверка: SHOW CREATE TABLE debezium_signal\G
Исправление: ALTER TABLE debezium_signal ADD PRIMARY KEY (id);
Connector Configuration
// Обязательные свойства для signal table
"signal.data.collection": "inventory.debezium_signal"
"incremental.snapshot.chunk.size": "2048"
// Schema ДОЛЖНА быть в database.include.list
"database.include.list": "inventory"

Ключевое отличие от Initial Snapshot:

  • Signal table использует сам binlog для передачи команд Debezium
  • Debezium читает INSERT event в signal table как trigger для snapshot
  • Команда обрабатывается асинхронно вместе с CDC stream

Signal Table Schema

Signal table имеет фиксированную структуру, которую Debezium expects:

CREATE TABLE debezium_signal (
  id VARCHAR(36) NOT NULL,      -- Unique signal ID (обязательно PRIMARY KEY)
  type VARCHAR(32) NOT NULL,    -- Signal type: execute-snapshot, stop-snapshot, etc.
  data TEXT,                    -- JSON payload with signal parameters
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Danger

КРИТИЧЕСКАЯ ОШИБКА: Signal table без PRIMARY KEY приводит к silent failure.

Что произойдет:

  1. Вы выполняете INSERT в signal table без PK
  2. Debezium capture INSERT event из binlog
  3. Debezium пытается parse event → fails silently (no error logs)
  4. Incremental snapshot не запускается, никакого сообщения об ошибке

Проверка:

SHOW CREATE TABLE debezium_signal\G

Обязательно наличие:

PRIMARY KEY (`id`)

Если PRIMARY KEY отсутствует:

ALTER TABLE debezium_signal ADD PRIMARY KEY (id);

Column requirements:

ColumnTypeRequiredPurpose
idVARCHAR(36)Unique ID (рекомендуется UUID())
typeVARCHAR(32)Signal type: execute-snapshot, stop-snapshot
dataTEXTJSON payload (может быть NULL для некоторых signals)

Создание signal table:

-- Подключиться к MySQL
mysql -h localhost -P 3307 -u root -pmysql inventory

-- Создать signal table
CREATE TABLE debezium_signal (
  id VARCHAR(36) NOT NULL,
  type VARCHAR(32) NOT NULL,
  data TEXT,
  PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- Выдать права debezium user
GRANT INSERT, UPDATE, DELETE ON inventory.debezium_signal TO 'debezium'@'%';
FLUSH PRIVILEGES;

Проверка создания:

DESCRIBE debezium_signal;

-- Expected output:
-- +-------+-------------+------+-----+---------+-------+
-- | Field | Type        | Null | Key | Default | Extra |
-- +-------+-------------+------+-----+---------+-------+
-- | id    | varchar(36) | NO   | PRI | NULL    |       |
-- | type  | varchar(32) | NO   |     | NULL    |       |
-- | data  | text        | YES  |     | NULL    |       |
-- +-------+-------------+------+-----+---------+-------+
Tip

Рекомендация: Используйте MySQL функцию UUID() для генерации уникальных IDs:

INSERT INTO debezium_signal (id, type, data)
VALUES (UUID(), 'execute-snapshot', '...');

Почему UUID()?

  • Гарантирует uniqueness даже при concurrent signals
  • Не требует manual tracking последнего ID
  • Compatible с distributed systems (несколько engineers могут отправлять signals одновременно)

Альтернативы:

  • Sequential IDs: 'signal-001', 'signal-002' (требует coordination)
  • Timestamp-based: CONCAT('sig-', UNIX_TIMESTAMP()) (может иметь collisions)
Проверка знаний
Почему signal table обязательно должна иметь PRIMARY KEY и что произойдёт без него?
Ответ
Без PRIMARY KEY signal table приводит к silent failure: Debezium захватывает INSERT event из binlog, но не может корректно разобрать его структуру, потому что ожидает PK для binlog event structure. В результате incremental snapshot никогда не запускается, при этом никаких сообщений об ошибке в логах не появляется — полностью тихий отказ.

Connector Configuration для Signal Table

Чтобы Debezium начал мониторить signal table, нужно добавить свойство signal.data.collection в connector config:

{
  "name": "mysql-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",

    // Table filtering
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers,inventory.orders,inventory.products",

    // Topic naming
    "topic.prefix": "mysql-server",

    // SIGNAL TABLE CONFIGURATION
    "signal.data.collection": "inventory.debezium_signal",

    // Incremental snapshot chunk size (optional, default: 2048)
    "incremental.snapshot.chunk.size": "2048",

    // Schema history
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.mysql-server",

    // Snapshot mode
    "snapshot.mode": "initial",

    // Other properties...
    "heartbeat.interval.ms": "30000",
    "decimal.handling.mode": "precise"
  }
}

Ключевые свойства:

1. signal.data.collection

"signal.data.collection": "inventory.debezium_signal"

Что это: Fully-qualified name signal table (format: {database}.{table}).

Важно:

  • Signal table schema ДОЛЖНА быть включена в database.include.list
  • Если signal table в другой database, добавьте database в whitelist
  • Debezium мониторит INSERT events в эту таблицу через binlog

Пример с signal table в отдельной database:

"database.include.list": "inventory,debezium_admin",
"signal.data.collection": "debezium_admin.signal_table"

2. incremental.snapshot.chunk.size

"incremental.snapshot.chunk.size": "2048"

Что это: Количество rows, читаемых за один chunk.

Как выбрать значение:

Chunk SizeWhen to UseTrade-offs
1024Таблицы с очень широкими rows (100+ columns)Медленнее, но меньше memory usage
2048Default - подходит для большинства casesBalanced performance
4096Таблицы с узкими rows (5-10 columns)Быстрее, но больше memory
8192Large tables, high-performance snapshotМожет вызвать memory pressure

Formula для оценки chunk size:

Chunk Size = (Desired Memory per Chunk MB) / (Average Row Size KB) * 1024

Пример:

  • Average row size: 5 KB
  • Desired memory per chunk: 10 MB
  • Chunk size = 10 MB / 5 KB = 2048 rows
Warning

ЧАСТАЯ ОШИБКА:

"database.include.list": "inventory",
"signal.data.collection": "inventory.debezium_signal"

Если inventory в database.include.list — ✅ работает.

НО если:

"database.include.list": "orders_db",
"signal.data.collection": "admin.debezium_signal"

Signal table в admin database, которая НЕ в whitelist → signals не будут captured.

Решение:

"database.include.list": "orders_db,admin"

Verification: После deployment connector, проверьте logs:

docker compose logs debezium-connect | grep "signal.data.collection"

Ожидаемый log:

INFO: Signal data collection set to 'inventory.debezium_signal'

Если нет этого сообщения — проверьте whitelist.

Triggering Incremental Snapshot: SQL Examples

Теперь, когда signal table настроена и connector знает о ней, можно trigger incremental snapshot через SQL INSERT.

Базовый Snapshot: Одна таблица

-- Snapshot таблицы inventory.orders
INSERT INTO inventory.debezium_signal (id, type, data)
VALUES (
  UUID(),
  'execute-snapshot',
  '{"data-collections": ["inventory.orders"]}'
);

Что происходит:

  1. Debezium capture INSERT event из binlog
  2. Parse type field: execute-snapshot
  3. Parse data JSON: data-collections = ["inventory.orders"]
  4. Start incremental snapshot для inventory.orders
  5. Chunk-based reading: SELECT * FROM orders WHERE id >= X AND id < X+2048
  6. Между chunks обрабатывает binlog events (параллельно)
  7. После завершения всех chunks: snapshot complete

Expected connector logs:

INFO: Incremental snapshot requested for data collections: [inventory.orders]
INFO: Starting incremental snapshot for table 'inventory.orders'
INFO: Snapshot window for table 'inventory.orders': [1, 2048]
INFO: Snapshot window for table 'inventory.orders': [2049, 4096]
...
INFO: Incremental snapshot completed for table 'inventory.orders'

Snapshot нескольких таблиц

-- Snapshot трех таблиц одновременно
INSERT INTO inventory.debezium_signal (id, type, data)
VALUES (
  UUID(),
  'execute-snapshot',
  '{"data-collections": ["inventory.customers", "inventory.orders", "inventory.products"]}'
);

Обработка:

  • Debezium обрабатывает таблицы последовательно (не parallel)
  • Порядок: customers → orders → products
  • Каждая таблица chunk-based
  • Binlog streaming продолжается параллельно для всех таблиц

Filtered Snapshot: Snapshot с WHERE условием

-- Snapshot только orders созданных в 2024 году
INSERT INTO inventory.debezium_signal (id, type, data)
VALUES (
  UUID(),
  'execute-snapshot',
  '{
    "data-collections": ["inventory.orders"],
    "additional-conditions": [
      {
        "data-collection": "inventory.orders",
        "filter": "created_at >= ''2024-01-01'' AND created_at < ''2025-01-01''"
      }
    ]
  }'
);

Важно:

  • filter field содержит SQL WHERE condition
  • Double quotes escaping: Используйте '' (два одинарных quote) вместо ' для литералов в JSON
  • Условие применяется к каждому chunk: SELECT * WHERE (chunk condition) AND (filter condition)

Use cases:

  • Snapshot только recent data (last 30 days)
  • Snapshot specific partition (e.g., region = ‘US-EAST’)
  • Snapshot failed rows (e.g., sync_status = ‘FAILED’)

Stop In-Progress Snapshot

-- Остановить текущий incremental snapshot
INSERT INTO inventory.debezium_signal (id, type, data)
VALUES (
  UUID(),
  'stop-snapshot',
  '{"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}'
);

Когда использовать:

  • Snapshot занимает слишком долго (блокирует другие operations)
  • Discovered ошибка в filter condition (неправильные данные snapshot)
  • Need to reconfigure chunk size

Что происходит:

  • Debezium завершает текущий chunk
  • Прекращает snapshot после завершения chunk (не прерывает chunk mid-way)
  • Events уже sent to Kafka — остаются (не откатываются)

Resume snapshot:

  • Incremental snapshot НЕ resumable после stop-snapshot
  • Нужно trigger новый execute-snapshot signal

Snapshot Progress Monitoring

Incremental snapshot может длиться часы для больших таблиц. Важно мониторить прогресс.

JMX Metrics: Real-Time Status

Debezium connector expose JMX metrics для incremental snapshot:

Key Metrics:

MetricTypeMeaning
SnapshotRunningBooleantrue если incremental snapshot в процессе
SnapshotCompletedBooleantrue после завершения snapshot
SnapshotAbortedBooleantrue если snapshot stopped/failed
ChunkIdStringCurrent chunk ID (e.g., “1:2048”)
TotalTableCountIntegerКоличество таблиц в snapshot
RemainingTableCountIntegerОставшиеся таблицы

Access JMX metrics via JConsole:

# Enable JMX in Kafka Connect (docker-compose.yml)
environment:
  KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

# Connect JConsole to localhost:9999
# Navigate to: debezium.mysql:type=connector-metrics,context=snapshot,server=mysql-server

Connector Logs: Detailed Progress

# Real-time logs
docker compose logs -f debezium-connect | grep -E "(Incremental|Snapshot|chunk)"

# Expected output:
# INFO: Incremental snapshot requested for data collections: [inventory.orders]
# INFO: Snapshot window for table 'inventory.orders': [1, 2048]
# INFO: Snapshot - Scanned 2048 rows from table 'inventory.orders'
# INFO: Snapshot window for table 'inventory.orders': [2049, 4096]
# INFO: Snapshot - Scanned 2048 rows from table 'inventory.orders'
# ...
# INFO: Incremental snapshot completed for table 'inventory.orders'

Как оценить оставшееся время:

# Pseudo-code для estimation
total_rows = 1000000  # Из SHOW TABLE STATUS
chunk_size = 2048
avg_chunk_duration_sec = 5  # Засекаем время обработки одного chunk из logs

chunks_remaining = (total_rows - current_row) / chunk_size
estimated_time_sec = chunks_remaining * avg_chunk_duration_sec
estimated_hours = estimated_time_sec / 3600

Prometheus Queries (если настроен JMX Exporter)

# Snapshot running status
debezium_metrics_SnapshotRunning{context="snapshot",server="mysql-server"}

# Remaining table count
debezium_metrics_RemainingTableCount{context="snapshot",server="mysql-server"}

# Alert if snapshot running > 6 hours
(
  debezium_metrics_SnapshotRunning == 1
  and
  time() - debezium_metrics_SnapshotStartTimestamp > 21600
)

Grafana Panel Example:

{
  "title": "Incremental Snapshot Progress",
  "targets": [
    {
      "expr": "debezium_metrics_RemainingTableCount{server=\"mysql-server\"}",
      "legendFormat": "Remaining Tables"
    }
  ],
  "yaxes": [
    {
      "label": "Table Count",
      "min": 0
    }
  ]
}
Tip

В Phase 15 Plan 01 мы создали Grafana dashboards для Debezium monitoring. Для incremental snapshot рекомендуется добавить панель:

Snapshot Status Panel:

  • Metric: debezium_metrics_SnapshotRunning
  • Visualization: Stat panel (показывает “Running” / “Idle”)
  • Alert: Если snapshot running > 12 часов

Snapshot Progress Panel:

  • Metric: debezium_metrics_RemainingTableCount
  • Visualization: Graph (line chart showing decline to 0)
  • Helpful для tracking multi-table snapshots

Read-Only Incremental Snapshots для Aurora Replicas

В некоторых production сценариях CDC connector подключается к Aurora read replica, а не к writer instance:

Зачем CDC с read replica?

  • Offload snapshot и binlog reading с production writer
  • Zero impact на write latency для application
  • Compliance: Read-only access policy (Debezium user не может писать в production)

Проблема: Signal table требует INSERT команды → невозможно на read-only replica.

Решение: Kafka Signal Channel — alternative механизм signaling без database writes.

Архитектура Read-Only Incremental Snapshot

Chunk Processing с Watermarks
LOW Watermark
id=1
chunk range
SELECT * WHERE
id >= 1 AND id < 2049
HIGH Watermark
id=2049
Interleaving с Binlog Events
Chunk 1: rows 1-2048
|
Binlog events
|
Chunk 2: rows 2049-4096
Collision Detection и Deduplication
Проблема: Что если row изменился во время snapshot?
Сценарий Collision
T1:Snapshot читает row id=500 со значением A
T2:UPDATE row id=500 SET value=B (binlog event)
?Какое значение правильное?
Решение: Streaming Priority
1.Debezium сравнивает timestamps
2.Binlog event (streaming) имеет приоритет
3.Snapshot event для этого row пропускается
Результат: Consumer всегда видит актуальное значение. Streaming events (более свежие) имеют приоритет над snapshot events.
Выбор Chunk Size
Chunk SizeКогда использоватьTrade-offs
1024Широкие rows (100+ columns)Медленнее, меньше memory
2048Default — большинство casesBalanced performance
4096Узкие rows (5-10 columns)Быстрее, больше memory
8192Large tables, high-perfМожет вызвать memory pressure

Workflow отличается:

  1. Signal table НЕ используется (replica read-only)
  2. Kafka signal topic создается для передачи commands
  3. Engineer sends signals via kafka-console-producer вместо SQL INSERT
  4. Debezium connector consumes signals from Kafka topic

Prerequisites для Read-Only Snapshot

Danger

Read-only incremental snapshot требует GTID mode ON и дополнительные GTID settings на Aurora:

Обязательные параметры:

-- В Aurora DB Cluster Parameter Group
gtid_mode = ON
enforce_gtid_consistency = ON
replica_preserve_commit_order = ON  -- CRITICAL для read replicas

Почему replica_preserve_commit_order критичен?

  • Debezium читает binlog с read replica, где events replication asynchronous
  • Без replica_preserve_commit_order: Binlog events могут появиться out of order на replica
  • Debezium может прочитать UPDATE event до соответствующего INSERT event
  • Результат: Data inconsistency в Kafka topics

Проверка prerequisites:

-- Подключиться к Aurora read replica
mysql -h aurora-replica.cluster-ro-abc123.us-east-1.rds.amazonaws.com -u debezium -p

-- Проверить GTID mode
SHOW VARIABLES LIKE 'gtid_mode';
-- Expected: ON

-- Проверить enforce_gtid_consistency
SHOW VARIABLES LIKE 'enforce_gtid_consistency';
-- Expected: ON

-- Проверить replica_preserve_commit_order (Aurora 2.10+)
SHOW VARIABLES LIKE 'replica_preserve_commit_order';
-- Expected: ON

Если settings некорректны:

  • Обновить DB Cluster Parameter Group (не DB Parameter Group!)
  • Reboot read replica instances (writer может остаться running)

Kafka Signal Topic Setup

Step 1: Создать Kafka signal topic

docker compose exec kafka kafka-topics \
  --bootstrap-server localhost:9092 \
  --create \
  --topic debezium-signal-mysql \
  --partitions 1 \
  --replication-factor 3 \
  --config retention.ms=86400000  # 24 hours (signals ephemeral)

Topic configuration:

ParameterValueReasoning
--partitions1Single partition для ordered signal processing
--replication-factor3High availability (production)
retention.ms8640000024 hours (signals можно purge после обработки)

Connector Configuration для Kafka Signal Channel

{
  "name": "aurora-replica-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",

    // CONNECT TO READ REPLICA (not writer)
    "database.hostname": "aurora-cluster.cluster-ro-abc123.us-east-1.rds.amazonaws.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "${file:/secrets/mysql-password.txt:password}",
    "database.server.id": "184057",

    "database.include.list": "inventory",
    "table.include.list": "inventory.customers,inventory.orders",
    "topic.prefix": "aurora-mysql",

    // KAFKA SIGNAL CHANNEL (instead of signal.data.collection)
    "signal.kafka.topic": "debezium-signal-mysql",
    "signal.kafka.bootstrap.servers": "kafka:9092",
    "signal.kafka.groupId": "debezium-signal-consumer-aurora",

    // Incremental snapshot chunk size
    "incremental.snapshot.chunk.size": "2048",

    // Schema history
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.aurora-mysql",

    // Snapshot mode
    "snapshot.mode": "initial",

    // GTID source (critical for read replicas)
    "gtid.source.includes": ".*",

    // Other properties
    "heartbeat.interval.ms": "30000",
    "decimal.handling.mode": "precise"
  }
}

Ключевые отличия от database signal table:

- "signal.data.collection": "inventory.debezium_signal"
+ "signal.kafka.topic": "debezium-signal-mysql"
+ "signal.kafka.bootstrap.servers": "kafka:9092"
+ "signal.kafka.groupId": "debezium-signal-consumer-aurora"

Triggering Snapshot через Kafka Signal Topic

Вместо SQL INSERT:

-- ❌ НЕ работает на read-only replica
INSERT INTO debezium_signal (id, type, data) VALUES (...);

Используем kafka-console-producer:

# Trigger incremental snapshot для inventory.orders
docker compose exec kafka kafka-console-producer \
  --bootstrap-server localhost:9092 \
  --topic debezium-signal-mysql \
  --property "parse.key=true" \
  --property "key.separator=:"

# В интерактивном режиме введите (одна строка):
{"id":"aurora-snapshot-001"}:{"type":"execute-snapshot","data":{"data-collections":["inventory.orders"]}}

Format signal message:

Key: {"id": "unique-signal-id"}
Value: {"type": "execute-snapshot", "data": {"data-collections": ["inventory.orders"]}}

Key components:

  • Key: JSON с полем id (должен быть unique)
  • Value: JSON с полями type и data
  • Separator: : (настроен через key.separator)

Filtered snapshot через Kafka:

docker compose exec kafka kafka-console-producer \
  --bootstrap-server localhost:9092 \
  --topic debezium-signal-mysql \
  --property "parse.key=true" \
  --property "key.separator=:"

# Input:
{"id":"aurora-snapshot-002"}:{"type":"execute-snapshot","data":{"data-collections":["inventory.orders"],"additional-conditions":[{"data-collection":"inventory.orders","filter":"created_at >= '2024-01-01'"}]}}

Stop snapshot через Kafka:

{"id":"aurora-stop-001"}:{"type":"stop-snapshot","data":{"data-collections":["inventory.orders"],"type":"INCREMENTAL"}}
Warning

Вопрос: Если signal table проблема для read-only replicas, почему нельзя использовать initial snapshot (snapshot.mode=initial)?

Ответ: Initial snapshot тоже требует write access к базе данных (не для data, но для internal state management):

Initial snapshot steps requiring writes:

  1. Locking phase: LOCK TABLES (read lock, но требует LOCK privilege)
  2. GTID position recording: Некоторые Debezium versions пытаются записать metadata
  3. Snapshot marker: Debezium может создавать temporary tables для tracking snapshot state

Aurora read replica restrictions:

  • LOCK TABLES может быть blocked (зависит от Aurora version и parameter groups)
  • Любые DDL statements (CREATE TABLE, даже temporary) — denied
  • User с read-only grants не может выполнить LOCK operations

Решение:

  • Initial snapshot запускать на writer instance
  • Incremental snapshots на read replica через Kafka signal channel
  • Или использовать backup-based approach (Phase 14 Lesson 09)

Common Signal Table Pitfalls

Pitfall 1: Signal Table без database.include.list

Scenario:

"database.include.list": "inventory",
"signal.data.collection": "admin.debezium_signal"

Signal table в database admin, которая не в whitelist.

Результат:

  • Debezium не capture INSERT events в signal table
  • Incremental snapshot никогда не запускается
  • Нет error logs (silent failure)

Диагностика:

# Check connector logs для signal parsing
docker compose logs debezium-connect | grep "signal"

# Если нет logs вида:
# "INFO: Signal data collection set to 'admin.debezium_signal'"
# Проблема подтверждена

Решение:

"database.include.list": "inventory,admin"

Pitfall 2: Missing PRIMARY KEY

Scenario:

CREATE TABLE debezium_signal (
  id VARCHAR(36),
  type VARCHAR(32),
  data TEXT
  -- ❌ NO PRIMARY KEY
);

Результат:

  • Debezium capture INSERT events, но fails to parse (expects PK для binlog event structure)
  • Silent failure, no snapshot triggered

Диагностика:

SHOW CREATE TABLE debezium_signal\G

Если output не содержит PRIMARY KEY — проблема подтверждена.

Решение:

ALTER TABLE debezium_signal ADD PRIMARY KEY (id);

Pitfall 3: Binlog Retention < Snapshot Duration

Scenario:

  • Incremental snapshot для 500GB table запущен
  • Expected duration: 10 часов
  • Binlog retention: 7 дней (168 часов)
  • Но за время snapshot прошло 8 дней (connector был stopped из-за issue)

Результат:

  • Snapshot начался с binlog position mysql-bin.000120:154
  • После 10 часов snapshot завершен
  • Debezium пытается resume binlog streaming с position mysql-bin.000120:154
  • Файл mysql-bin.000120 purged (retention exceeded)
  • Error: “Cannot replicate because master purged required binary logs”

Диагностика:

-- Check binlog retention
SHOW VARIABLES LIKE 'binlog_expire_logs_seconds';

-- Check available binlog files
SHOW BINARY LOGS;

Mitigation:

Formula:

Required Binlog Retention >= Snapshot Duration + Safety Margin

Example:

  • Snapshot duration estimate: 12 hours
  • Safety margin: 2x (for unexpected delays)
  • Required retention: 12 * 2 = 24 hours (минимум)

Рекомендация: 7 days retention (604800 seconds) для production.

Aurora Configuration:

-- В DB Cluster Parameter Group
binlog_expire_logs_seconds = 604800  -- 7 days

Monitor binlog usage during snapshot:

-- Check binlog disk usage
SELECT
  SUM(ROUND(file_size / 1024 / 1024, 2)) AS binlog_size_mb
FROM information_schema.innodb_tablespaces
WHERE name LIKE 'mysql-bin%';

Aurora CloudWatch metric:

Metric: ChangeLogBytesUsed
Threshold: < 80% of available storage

Pitfall 4: Read-Only Environment без GTID Prerequisites

Scenario:

  • Incremental snapshot configured для Aurora read replica
  • Kafka signal channel setup
  • Но replica_preserve_commit_order=OFF

Результат:

  • Snapshot запускается
  • Debezium читает chunks с replica
  • Binlog events arrive out of order на replica
  • Chunk N содержит UPDATE для row ID=500
  • Но INSERT для row ID=500 еще не реплицирован с writer
  • Debezium publishes UPDATE event без соответствующего CREATE event
  • Consumer application видит UPDATE для non-existent row → data inconsistency

Диагностика:

SHOW VARIABLES LIKE 'replica_preserve_commit_order';
-- If OFF or not set → problem confirmed

Решение:

-- В Aurora DB Cluster Parameter Group
replica_preserve_commit_order = ON

-- Reboot read replica instances

Verification после reboot:

SHOW VARIABLES LIKE 'replica_preserve_commit_order';
-- Expected: ON

Binlog Retention Planning для Large Table Snapshots

Для очень больших таблиц incremental snapshot может длиться дни. Критически важно гарантировать, что binlog files доступны на протяжении всего snapshot.

Estimating Snapshot Duration

Formula:

Snapshot Duration (hours) = (Total Rows / Chunk Size) × Average Chunk Time (seconds) / 3600

Example:

total_rows = 100_000_000  # 100 million rows
chunk_size = 2048
avg_chunk_time_sec = 2  # 2 seconds per chunk (зависит от network, table width, indexes)

chunks = total_rows / chunk_size  # 48,828 chunks
total_time_sec = chunks * avg_chunk_time_sec  # 97,656 seconds
total_hours = total_time_sec / 3600  # 27 hours

Рекомендация: Измерить avg_chunk_time для первых 10 chunks из logs, затем extrapolate.

Retention Formula

Required Retention >= Snapshot Duration + Safety Margin + Normal Operations Buffer

Components:

  1. Snapshot Duration: Estimated hours для incremental snapshot
  2. Safety Margin: 2x factor для unexpected delays (network issues, connector restarts)
  3. Normal Operations Buffer: Minimum retention для non-snapshot periods (например, 3 дня)

Example Calculation:

Snapshot Duration = 27 hours
Safety Margin = 27 × 2 = 54 hours
Normal Operations Buffer = 72 hours (3 days)

Total Required Retention = 27 + 54 + 72 = 153 hours ≈ 7 days

Aurora Maximum Retention:

-- Aurora MySQL 3.x supports up to 90 days (2160 hours)
binlog_expire_logs_seconds = 7776000  -- 90 days

-- Но для большинства cases 7 дней достаточно
binlog_expire_logs_seconds = 604800  -- 7 days

Monitoring Binlog Health During Snapshot

CloudWatch Metrics (Aurora):

Metric: ChangeLogBytesUsed
Description: Total size of binlog files
Alert Threshold: > 80% of available storage

Alert Rule:

AlarmName: BinlogStorageHigh
MetricName: ChangeLogBytesUsed
Namespace: AWS/RDS
Threshold: 80000000000  # 80 GB (если total storage 100 GB)
ComparisonOperator: GreaterThanThreshold
EvaluationPeriods: 2
Period: 300  # 5 minutes

Manual Check:

-- Total binlog size
SHOW BINARY LOGS;

-- Sum file_size column manually или:
SELECT
  ROUND(SUM(file_size) / 1024 / 1024 / 1024, 2) AS total_gb
FROM (
  SELECT file_size FROM information_schema.binlog_files
) AS binlog_summary;

If binlog storage approaching limit:

  1. Pause incremental snapshot (stop-snapshot signal)
  2. Increase retention (если позволяет storage)
  3. Resize Aurora storage (если retention нельзя увеличить)
  4. Resume snapshot после stabilization

Hands-On Exercises

Exercise 1: Setup Signal Table и Trigger Basic Snapshot

Goal: Создать signal table, настроить connector, trigger incremental snapshot для одной таблицы.

Steps:

  1. Создать signal table в MySQL:

    CREATE TABLE inventory.debezium_signal (
      id VARCHAR(36) NOT NULL,
      type VARCHAR(32) NOT NULL,
      data TEXT,
      PRIMARY KEY (id)
    ) ENGINE=InnoDB;
    
    GRANT INSERT, UPDATE, DELETE ON inventory.debezium_signal TO 'debezium'@'%';
    FLUSH PRIVILEGES;
  2. Обновить connector config:

    "signal.data.collection": "inventory.debezium_signal",
    "incremental.snapshot.chunk.size": "2048"
  3. Redeploy connector:

    curl -X PUT http://localhost:8083/connectors/mysql-inventory-connector/config \
      -H "Content-Type: application/json" \
      -d @updated-connector-config.json
  4. Trigger snapshot:

    INSERT INTO inventory.debezium_signal (id, type, data)
    VALUES (UUID(), 'execute-snapshot', '{"data-collections": ["inventory.products"]}');
  5. Monitor logs:

    docker compose logs -f debezium-connect | grep -i "incremental"

Expected outcome:

  • Connector logs show “Incremental snapshot requested”
  • Kafka topic mysql-server.inventory.products receives snapshot events
  • After completion: “Incremental snapshot completed”

Exercise 2: Monitor Snapshot Progress

Goal: Track incremental snapshot через connector logs и estimate completion time.

Steps:

  1. Trigger snapshot для large table:

    INSERT INTO inventory.debezium_signal (id, type, data)
    VALUES (UUID(), 'execute-snapshot', '{"data-collections": ["inventory.orders"]}');
  2. Extract chunk timing:

    docker compose logs debezium-connect | grep "Snapshot window" | tail -20
  3. Calculate average chunk time:

    # Measure time between consecutive chunks from logs
    # Example: Chunk 1 at 10:00:00, Chunk 2 at 10:00:02 → 2 seconds
    avg_chunk_time = 2
  4. Estimate remaining time:

    total_rows = 500000  # From SHOW TABLE STATUS
    chunk_size = 2048
    current_chunk = 50
    
    remaining_chunks = (total_rows / chunk_size) - current_chunk
    estimated_sec = remaining_chunks * avg_chunk_time
    estimated_hours = estimated_sec / 3600
    
    print(f"Estimated completion: {estimated_hours:.2f} hours")

Exercise 3: Stop In-Progress Snapshot

Goal: Cancel incremental snapshot и verify clean stop.

Steps:

  1. Start long-running snapshot:

    INSERT INTO inventory.debezium_signal (id, type, data)
    VALUES (UUID(), 'execute-snapshot', '{"data-collections": ["inventory.orders"]}');
  2. After 1-2 minutes, send stop signal:

    INSERT INTO inventory.debezium_signal (id, type, data)
    VALUES (UUID(), 'stop-snapshot', '{"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}');
  3. Verify stop:

    docker compose logs debezium-connect | grep -E "(stop|abort|cancel)"

Expected outcome:

  • Connector logs show “Incremental snapshot stopped”
  • Current chunk completes before stopping
  • Connector resumes normal binlog streaming

Exercise 4 (Advanced): Configure Kafka Signal Channel для Read-Only Scenario

Goal: Setup Kafka-based signaling for Aurora read replica CDC.

Steps:

  1. Create Kafka signal topic:

    docker compose exec kafka kafka-topics \
      --bootstrap-server localhost:9092 \
      --create \
      --topic debezium-signal-mysql \
      --partitions 1 \
      --replication-factor 1
  2. Update connector config:

    "signal.kafka.topic": "debezium-signal-mysql",
    "signal.kafka.bootstrap.servers": "kafka:9092",
    "signal.kafka.groupId": "debezium-signal-consumer"
  3. Trigger snapshot via Kafka:

    docker compose exec kafka kafka-console-producer \
      --bootstrap-server localhost:9092 \
      --topic debezium-signal-mysql \
      --property "parse.key=true" \
      --property "key.separator=:"
    
    # Input:
    {"id":"test-001"}:{"type":"execute-snapshot","data":{"data-collections":["inventory.products"]}}
  4. Verify signal consumption:

    docker compose logs debezium-connect | grep "Incremental snapshot requested"

Key Takeaways

1. Incremental snapshot решает ограничения initial snapshot

  • On-demand snapshot без connector restart
  • Chunk-based reading (resumable, parallel с binlog streaming)
  • Можно snapshot specific tables (не all-or-nothing)

2. Signal table требует строгую структуру

  • PRIMARY KEY на id обязателен (silent failure без PK)
  • Schema должна быть в database.include.list
  • GRANT INSERT права для debezium user

3. Connector configuration: signal.data.collection

  • Fully-qualified name: {database}.{table}
  • incremental.snapshot.chunk.size по умолчанию 2048 (tune для performance)
  • Signal table schema ДОЛЖНА быть в database whitelist

4. Triggering snapshot через SQL INSERT

  • Basic: {"data-collections": ["table"]}
  • Multiple tables: JSON array
  • Filtered: additional-conditions с SQL WHERE clause
  • Stop: type: "stop-snapshot"

5. Read-only snapshots через Kafka signal channel

  • Альтернатива signal table для Aurora read replicas
  • Требует GTID prerequisites: replica_preserve_commit_order=ON
  • Signals sent via kafka-console-producer
  • signal.kafka.topic вместо signal.data.collection

6. Monitoring через JMX metrics и logs

  • SnapshotRunning, RemainingTableCount (JMX)
  • Connector logs: chunk windows, scanned rows
  • Estimate completion time: (total_rows / chunk_size) × avg_chunk_time

7. Common pitfalls prevention

  • Signal table schema в database.include.list (иначе silent failure)
  • PRIMARY KEY обязателен (no PK = no signals processed)
  • Binlog retention >= snapshot duration + safety margin
  • Read-only environments: GTID prerequisites critical

8. Binlog retention planning для large snapshots

  • Formula: Snapshot Duration + 2× Safety Margin + 3-day buffer
  • Рекомендация: 7 days minimum (604800 seconds)
  • Monitor ChangeLogBytesUsed (Aurora CloudWatch)

Что дальше?

В следующем уроке “MySQL/Aurora Production Troubleshooting” мы изучим:

  • Debugging failed snapshots (initial и incremental)
  • Recovery strategies после binlog purge
  • Schema evolution во время snapshot
  • Performance tuning для large-table snapshots
  • Multi-region CDC challenges (Aurora Global Database)

Модуль 8 roadmap:

  • ✅ Lesson 1-9: MySQL binlog, GTID, Aurora parameter groups, snapshot modes
  • Lesson 12: Incremental snapshots ← Вы здесь
  • 🔜 Lesson 13: Production troubleshooting
  • 🔜 Lesson 14: MySQL/Aurora decision matrix

Check Your Understanding

Score: 0 of 0
Conceptual
Question 1 of 4. Какую проблему incremental snapshot решает по сравнению с initial snapshot?

Finished the lesson?

Mark it as complete to track your progress