Prerequisites:
- module-8/09-aurora-snapshot-modes
Incremental Snapshots через Signal Table
Когда Initial Snapshot недостаточно
В предыдущих уроках мы изучили initial snapshot — механизм первоначальной загрузки данных при старте Debezium connector. Этот подход работает отлично для первого запуска, но не подходит для многих production сценариев:
Ограничения Initial Snapshot
Проблема 1: Требует перезапуска connector
- Initial snapshot выполняется только при первом запуске (snapshot.mode=initial)
- Чтобы resnapshot таблицу, нужно остановить и удалить connector, потерять offset
- Downtime для всего CDC pipeline на время resnapshot
Проблема 2: All-or-nothing подход
- Snapshot захватывает все таблицы из table.include.list
- Нельзя resnapshot одну конкретную таблицу, не затрагивая остальные
- Если таблица A нуждается в resnapshot, придется resnapshot и таблицы B, C, D
Проблема 3: Невозможность snapshot новых таблиц
- Добавление таблицы в table.include.list требует connector restart
- Restart триггерит snapshot всех таблиц, включая те, что уже были synced
Реальные сценарии, требующие on-demand snapshot
Сценарий 1: Добавление новой таблицы
Timeline:
- Day 1: Connector запущен с таблицами customers, orders
- Day 30: Разработчики создали новую таблицу products
- Task: Добавить products к CDC без resnapshot customers, orders
Сценарий 2: Восстановление после extended downtime
Scenario:
- Connector был остановлен на 10 дней (technical issue)
- Binlog retention = 7 дней (168 часов)
- Результат: Binlog files для дней 1-3 purged
- Task: Resnapshot данные, потерянные из-за purge
Сценарий 3: Data inconsistency recovery
Issue:
- Consumer application имел bug
- Неправильно обработал UPDATE события для таблицы orders
- Target database (Elasticsearch, PostgreSQL) содержит corrupted данные
- Task: Resnapshot orders для восстановления consistency
Incremental snapshot решает все эти проблемы.
Initial vs Incremental Snapshot: Архитектурное сравнение
execute-snapshot
WHERE id >= N AND id < N+2048
2048 rows
op='r'
op='c/u/d'
id >= 2049
Consumer различает события по полю
source.snapshot:"incremental" для snapshot, "false" для streaming.- - Блокирует binlog streaming
- - Все таблицы сразу
- - Не resumable при crash
- - Требует restart connector
- + Параллельно с streaming
- + Конкретные таблицы on-demand
- + Resumable при crash
- + Через signal table (без restart)
Ключевые отличия
| Характеристика | Initial Snapshot | Incremental Snapshot |
|---|---|---|
| Триггер | Connector start (snapshot.mode=initial) | SQL INSERT в signal table (on-demand) |
| Connector restart | ✅ Требуется | ❌ Не требуется |
| Scope | Все таблицы из table.include.list | Конкретные таблицы (указываются в signal) |
| Метод чтения | Full table scan за одну транзакцию | Chunk-based (default: 2048 rows/chunk) |
| Параллельность с CDC | ❌ Binlog streaming blocked до завершения snapshot | ✅ Binlog streaming продолжается параллельно |
| Locks | Table locks (minimal mode) или no locks (none mode) | No locks (chunk reading через SELECT with PK range) |
| Resume after failure | ❌ Restart from scratch | ✅ Resume from last completed chunk |
| Use Case | Первый запуск connector | On-demand resnapshot, добавление таблиц, recovery |
Почему incremental snapshot называется “incremental”?
- Данные читаются chunk by chunk (incremental progress)
- Каждый chunk обрабатывается независимо, между chunks Debezium обрабатывает binlog events
- Resumable: Если snapshot прерывается, Debezium знает, какие chunks уже прочитаны
Проверка знанийВ чём ключевое архитектурное отличие incremental snapshot от initial snapshot в Debezium?
Signal Table: Механизм управления Debezium
Incremental snapshot триггируется через signal table — специальную служебную таблицу, которую Debezium мониторит для получения команд.
Архитектура Signal Table
["inventory.orders"]}'
["inventory.orders"],
"type":"INCREMENTAL"}'
Без PK: INSERT event не распознается как signal → snapshot не запускается → нет ошибки в логах.
Проверка:
SHOW CREATE TABLE debezium_signal\GИсправление:
ALTER TABLE debezium_signal ADD PRIMARY KEY (id);Ключевое отличие от Initial Snapshot:
- Signal table использует сам binlog для передачи команд Debezium
- Debezium читает INSERT event в signal table как trigger для snapshot
- Команда обрабатывается асинхронно вместе с CDC stream
Signal Table Schema
Signal table имеет фиксированную структуру, которую Debezium expects:
CREATE TABLE debezium_signal (
id VARCHAR(36) NOT NULL, -- Unique signal ID (обязательно PRIMARY KEY)
type VARCHAR(32) NOT NULL, -- Signal type: execute-snapshot, stop-snapshot, etc.
data TEXT, -- JSON payload with signal parameters
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
КРИТИЧЕСКАЯ ОШИБКА: Signal table без PRIMARY KEY приводит к silent failure.
Что произойдет:
- Вы выполняете INSERT в signal table без PK
- Debezium capture INSERT event из binlog
- Debezium пытается parse event → fails silently (no error logs)
- Incremental snapshot не запускается, никакого сообщения об ошибке
Проверка:
SHOW CREATE TABLE debezium_signal\GОбязательно наличие:
PRIMARY KEY (`id`)Если PRIMARY KEY отсутствует:
ALTER TABLE debezium_signal ADD PRIMARY KEY (id);Column requirements:
| Column | Type | Required | Purpose |
|---|---|---|---|
id | VARCHAR(36) | ✅ | Unique ID (рекомендуется UUID()) |
type | VARCHAR(32) | ✅ | Signal type: execute-snapshot, stop-snapshot |
data | TEXT | ✅ | JSON payload (может быть NULL для некоторых signals) |
Создание signal table:
-- Подключиться к MySQL
mysql -h localhost -P 3307 -u root -pmysql inventory
-- Создать signal table
CREATE TABLE debezium_signal (
id VARCHAR(36) NOT NULL,
type VARCHAR(32) NOT NULL,
data TEXT,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Выдать права debezium user
GRANT INSERT, UPDATE, DELETE ON inventory.debezium_signal TO 'debezium'@'%';
FLUSH PRIVILEGES;
Проверка создания:
DESCRIBE debezium_signal;
-- Expected output:
-- +-------+-------------+------+-----+---------+-------+
-- | Field | Type | Null | Key | Default | Extra |
-- +-------+-------------+------+-----+---------+-------+
-- | id | varchar(36) | NO | PRI | NULL | |
-- | type | varchar(32) | NO | | NULL | |
-- | data | text | YES | | NULL | |
-- +-------+-------------+------+-----+---------+-------+
Рекомендация: Используйте MySQL функцию UUID() для генерации уникальных IDs:
INSERT INTO debezium_signal (id, type, data)
VALUES (UUID(), 'execute-snapshot', '...');Почему UUID()?
- Гарантирует uniqueness даже при concurrent signals
- Не требует manual tracking последнего ID
- Compatible с distributed systems (несколько engineers могут отправлять signals одновременно)
Альтернативы:
- Sequential IDs:
'signal-001','signal-002'(требует coordination) - Timestamp-based:
CONCAT('sig-', UNIX_TIMESTAMP())(может иметь collisions)
Проверка знанийПочему signal table обязательно должна иметь PRIMARY KEY и что произойдёт без него?
Connector Configuration для Signal Table
Чтобы Debezium начал мониторить signal table, нужно добавить свойство signal.data.collection в connector config:
{
"name": "mysql-inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
// Table filtering
"database.include.list": "inventory",
"table.include.list": "inventory.customers,inventory.orders,inventory.products",
// Topic naming
"topic.prefix": "mysql-server",
// SIGNAL TABLE CONFIGURATION
"signal.data.collection": "inventory.debezium_signal",
// Incremental snapshot chunk size (optional, default: 2048)
"incremental.snapshot.chunk.size": "2048",
// Schema history
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.mysql-server",
// Snapshot mode
"snapshot.mode": "initial",
// Other properties...
"heartbeat.interval.ms": "30000",
"decimal.handling.mode": "precise"
}
}
Ключевые свойства:
1. signal.data.collection
"signal.data.collection": "inventory.debezium_signal"
Что это: Fully-qualified name signal table (format: {database}.{table}).
Важно:
- Signal table schema ДОЛЖНА быть включена в
database.include.list - Если signal table в другой database, добавьте database в whitelist
- Debezium мониторит INSERT events в эту таблицу через binlog
Пример с signal table в отдельной database:
"database.include.list": "inventory,debezium_admin",
"signal.data.collection": "debezium_admin.signal_table"
2. incremental.snapshot.chunk.size
"incremental.snapshot.chunk.size": "2048"
Что это: Количество rows, читаемых за один chunk.
Как выбрать значение:
| Chunk Size | When to Use | Trade-offs |
|---|---|---|
| 1024 | Таблицы с очень широкими rows (100+ columns) | Медленнее, но меньше memory usage |
| 2048 | Default - подходит для большинства cases | Balanced performance |
| 4096 | Таблицы с узкими rows (5-10 columns) | Быстрее, но больше memory |
| 8192 | Large tables, high-performance snapshot | Может вызвать memory pressure |
Formula для оценки chunk size:
Chunk Size = (Desired Memory per Chunk MB) / (Average Row Size KB) * 1024
Пример:
- Average row size: 5 KB
- Desired memory per chunk: 10 MB
- Chunk size = 10 MB / 5 KB = 2048 rows
ЧАСТАЯ ОШИБКА:
"database.include.list": "inventory",
"signal.data.collection": "inventory.debezium_signal"Если inventory в database.include.list — ✅ работает.
НО если:
"database.include.list": "orders_db",
"signal.data.collection": "admin.debezium_signal"Signal table в admin database, которая НЕ в whitelist → signals не будут captured.
Решение:
"database.include.list": "orders_db,admin"Verification: После deployment connector, проверьте logs:
docker compose logs debezium-connect | grep "signal.data.collection"Ожидаемый log:
INFO: Signal data collection set to 'inventory.debezium_signal'Если нет этого сообщения — проверьте whitelist.
Triggering Incremental Snapshot: SQL Examples
Теперь, когда signal table настроена и connector знает о ней, можно trigger incremental snapshot через SQL INSERT.
Базовый Snapshot: Одна таблица
-- Snapshot таблицы inventory.orders
INSERT INTO inventory.debezium_signal (id, type, data)
VALUES (
UUID(),
'execute-snapshot',
'{"data-collections": ["inventory.orders"]}'
);
Что происходит:
- Debezium capture INSERT event из binlog
- Parse
typefield:execute-snapshot - Parse
dataJSON:data-collections = ["inventory.orders"] - Start incremental snapshot для
inventory.orders - Chunk-based reading: SELECT * FROM orders WHERE id >= X AND id < X+2048
- Между chunks обрабатывает binlog events (параллельно)
- После завершения всех chunks: snapshot complete
Expected connector logs:
INFO: Incremental snapshot requested for data collections: [inventory.orders]
INFO: Starting incremental snapshot for table 'inventory.orders'
INFO: Snapshot window for table 'inventory.orders': [1, 2048]
INFO: Snapshot window for table 'inventory.orders': [2049, 4096]
...
INFO: Incremental snapshot completed for table 'inventory.orders'
Snapshot нескольких таблиц
-- Snapshot трех таблиц одновременно
INSERT INTO inventory.debezium_signal (id, type, data)
VALUES (
UUID(),
'execute-snapshot',
'{"data-collections": ["inventory.customers", "inventory.orders", "inventory.products"]}'
);
Обработка:
- Debezium обрабатывает таблицы последовательно (не parallel)
- Порядок: customers → orders → products
- Каждая таблица chunk-based
- Binlog streaming продолжается параллельно для всех таблиц
Filtered Snapshot: Snapshot с WHERE условием
-- Snapshot только orders созданных в 2024 году
INSERT INTO inventory.debezium_signal (id, type, data)
VALUES (
UUID(),
'execute-snapshot',
'{
"data-collections": ["inventory.orders"],
"additional-conditions": [
{
"data-collection": "inventory.orders",
"filter": "created_at >= ''2024-01-01'' AND created_at < ''2025-01-01''"
}
]
}'
);
Важно:
filterfield содержит SQL WHERE condition- Double quotes escaping: Используйте
''(два одинарных quote) вместо'для литералов в JSON - Условие применяется к каждому chunk:
SELECT * WHERE (chunk condition) AND (filter condition)
Use cases:
- Snapshot только recent data (last 30 days)
- Snapshot specific partition (e.g., region = ‘US-EAST’)
- Snapshot failed rows (e.g., sync_status = ‘FAILED’)
Stop In-Progress Snapshot
-- Остановить текущий incremental snapshot
INSERT INTO inventory.debezium_signal (id, type, data)
VALUES (
UUID(),
'stop-snapshot',
'{"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}'
);
Когда использовать:
- Snapshot занимает слишком долго (блокирует другие operations)
- Discovered ошибка в filter condition (неправильные данные snapshot)
- Need to reconfigure chunk size
Что происходит:
- Debezium завершает текущий chunk
- Прекращает snapshot после завершения chunk (не прерывает chunk mid-way)
- Events уже sent to Kafka — остаются (не откатываются)
Resume snapshot:
- Incremental snapshot НЕ resumable после stop-snapshot
- Нужно trigger новый execute-snapshot signal
Snapshot Progress Monitoring
Incremental snapshot может длиться часы для больших таблиц. Важно мониторить прогресс.
JMX Metrics: Real-Time Status
Debezium connector expose JMX metrics для incremental snapshot:
Key Metrics:
| Metric | Type | Meaning |
|---|---|---|
SnapshotRunning | Boolean | true если incremental snapshot в процессе |
SnapshotCompleted | Boolean | true после завершения snapshot |
SnapshotAborted | Boolean | true если snapshot stopped/failed |
ChunkId | String | Current chunk ID (e.g., “1:2048”) |
TotalTableCount | Integer | Количество таблиц в snapshot |
RemainingTableCount | Integer | Оставшиеся таблицы |
Access JMX metrics via JConsole:
# Enable JMX in Kafka Connect (docker-compose.yml)
environment:
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
# Connect JConsole to localhost:9999
# Navigate to: debezium.mysql:type=connector-metrics,context=snapshot,server=mysql-server
Connector Logs: Detailed Progress
# Real-time logs
docker compose logs -f debezium-connect | grep -E "(Incremental|Snapshot|chunk)"
# Expected output:
# INFO: Incremental snapshot requested for data collections: [inventory.orders]
# INFO: Snapshot window for table 'inventory.orders': [1, 2048]
# INFO: Snapshot - Scanned 2048 rows from table 'inventory.orders'
# INFO: Snapshot window for table 'inventory.orders': [2049, 4096]
# INFO: Snapshot - Scanned 2048 rows from table 'inventory.orders'
# ...
# INFO: Incremental snapshot completed for table 'inventory.orders'
Как оценить оставшееся время:
# Pseudo-code для estimation
total_rows = 1000000 # Из SHOW TABLE STATUS
chunk_size = 2048
avg_chunk_duration_sec = 5 # Засекаем время обработки одного chunk из logs
chunks_remaining = (total_rows - current_row) / chunk_size
estimated_time_sec = chunks_remaining * avg_chunk_duration_sec
estimated_hours = estimated_time_sec / 3600
Prometheus Queries (если настроен JMX Exporter)
# Snapshot running status
debezium_metrics_SnapshotRunning{context="snapshot",server="mysql-server"}
# Remaining table count
debezium_metrics_RemainingTableCount{context="snapshot",server="mysql-server"}
# Alert if snapshot running > 6 hours
(
debezium_metrics_SnapshotRunning == 1
and
time() - debezium_metrics_SnapshotStartTimestamp > 21600
)
Grafana Panel Example:
{
"title": "Incremental Snapshot Progress",
"targets": [
{
"expr": "debezium_metrics_RemainingTableCount{server=\"mysql-server\"}",
"legendFormat": "Remaining Tables"
}
],
"yaxes": [
{
"label": "Table Count",
"min": 0
}
]
}
В Phase 15 Plan 01 мы создали Grafana dashboards для Debezium monitoring. Для incremental snapshot рекомендуется добавить панель:
Snapshot Status Panel:
- Metric:
debezium_metrics_SnapshotRunning - Visualization: Stat panel (показывает “Running” / “Idle”)
- Alert: Если snapshot running > 12 часов
Snapshot Progress Panel:
- Metric:
debezium_metrics_RemainingTableCount - Visualization: Graph (line chart showing decline to 0)
- Helpful для tracking multi-table snapshots
Read-Only Incremental Snapshots для Aurora Replicas
В некоторых production сценариях CDC connector подключается к Aurora read replica, а не к writer instance:
Зачем CDC с read replica?
- Offload snapshot и binlog reading с production writer
- Zero impact на write latency для application
- Compliance: Read-only access policy (Debezium user не может писать в production)
Проблема: Signal table требует INSERT команды → невозможно на read-only replica.
Решение: Kafka Signal Channel — alternative механизм signaling без database writes.
Архитектура Read-Only Incremental Snapshot
id=1
id >= 1 AND id < 2049
id=2049
| Chunk Size | Когда использовать | Trade-offs |
|---|---|---|
| 1024 | Широкие rows (100+ columns) | Медленнее, меньше memory |
| 2048 | Default — большинство cases | Balanced performance |
| 4096 | Узкие rows (5-10 columns) | Быстрее, больше memory |
| 8192 | Large tables, high-perf | Может вызвать memory pressure |
Workflow отличается:
- Signal table НЕ используется (replica read-only)
- Kafka signal topic создается для передачи commands
- Engineer sends signals via
kafka-console-producerвместо SQL INSERT - Debezium connector consumes signals from Kafka topic
Prerequisites для Read-Only Snapshot
Read-only incremental snapshot требует GTID mode ON и дополнительные GTID settings на Aurora:
Обязательные параметры:
-- В Aurora DB Cluster Parameter Group
gtid_mode = ON
enforce_gtid_consistency = ON
replica_preserve_commit_order = ON -- CRITICAL для read replicasПочему replica_preserve_commit_order критичен?
- Debezium читает binlog с read replica, где events replication asynchronous
- Без
replica_preserve_commit_order: Binlog events могут появиться out of order на replica - Debezium может прочитать UPDATE event до соответствующего INSERT event
- Результат: Data inconsistency в Kafka topics
Проверка prerequisites:
-- Подключиться к Aurora read replica
mysql -h aurora-replica.cluster-ro-abc123.us-east-1.rds.amazonaws.com -u debezium -p
-- Проверить GTID mode
SHOW VARIABLES LIKE 'gtid_mode';
-- Expected: ON
-- Проверить enforce_gtid_consistency
SHOW VARIABLES LIKE 'enforce_gtid_consistency';
-- Expected: ON
-- Проверить replica_preserve_commit_order (Aurora 2.10+)
SHOW VARIABLES LIKE 'replica_preserve_commit_order';
-- Expected: ONЕсли settings некорректны:
- Обновить DB Cluster Parameter Group (не DB Parameter Group!)
- Reboot read replica instances (writer может остаться running)
Kafka Signal Topic Setup
Step 1: Создать Kafka signal topic
docker compose exec kafka kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic debezium-signal-mysql \
--partitions 1 \
--replication-factor 3 \
--config retention.ms=86400000 # 24 hours (signals ephemeral)
Topic configuration:
| Parameter | Value | Reasoning |
|---|---|---|
--partitions | 1 | Single partition для ordered signal processing |
--replication-factor | 3 | High availability (production) |
retention.ms | 86400000 | 24 hours (signals можно purge после обработки) |
Connector Configuration для Kafka Signal Channel
{
"name": "aurora-replica-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// CONNECT TO READ REPLICA (not writer)
"database.hostname": "aurora-cluster.cluster-ro-abc123.us-east-1.rds.amazonaws.com",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${file:/secrets/mysql-password.txt:password}",
"database.server.id": "184057",
"database.include.list": "inventory",
"table.include.list": "inventory.customers,inventory.orders",
"topic.prefix": "aurora-mysql",
// KAFKA SIGNAL CHANNEL (instead of signal.data.collection)
"signal.kafka.topic": "debezium-signal-mysql",
"signal.kafka.bootstrap.servers": "kafka:9092",
"signal.kafka.groupId": "debezium-signal-consumer-aurora",
// Incremental snapshot chunk size
"incremental.snapshot.chunk.size": "2048",
// Schema history
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.aurora-mysql",
// Snapshot mode
"snapshot.mode": "initial",
// GTID source (critical for read replicas)
"gtid.source.includes": ".*",
// Other properties
"heartbeat.interval.ms": "30000",
"decimal.handling.mode": "precise"
}
}
Ключевые отличия от database signal table:
- "signal.data.collection": "inventory.debezium_signal"
+ "signal.kafka.topic": "debezium-signal-mysql"
+ "signal.kafka.bootstrap.servers": "kafka:9092"
+ "signal.kafka.groupId": "debezium-signal-consumer-aurora"
Triggering Snapshot через Kafka Signal Topic
Вместо SQL INSERT:
-- ❌ НЕ работает на read-only replica
INSERT INTO debezium_signal (id, type, data) VALUES (...);
Используем kafka-console-producer:
# Trigger incremental snapshot для inventory.orders
docker compose exec kafka kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic debezium-signal-mysql \
--property "parse.key=true" \
--property "key.separator=:"
# В интерактивном режиме введите (одна строка):
{"id":"aurora-snapshot-001"}:{"type":"execute-snapshot","data":{"data-collections":["inventory.orders"]}}
Format signal message:
Key: {"id": "unique-signal-id"}
Value: {"type": "execute-snapshot", "data": {"data-collections": ["inventory.orders"]}}
Key components:
- Key: JSON с полем
id(должен быть unique) - Value: JSON с полями
typeиdata - Separator:
:(настроен черезkey.separator)
Filtered snapshot через Kafka:
docker compose exec kafka kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic debezium-signal-mysql \
--property "parse.key=true" \
--property "key.separator=:"
# Input:
{"id":"aurora-snapshot-002"}:{"type":"execute-snapshot","data":{"data-collections":["inventory.orders"],"additional-conditions":[{"data-collection":"inventory.orders","filter":"created_at >= '2024-01-01'"}]}}
Stop snapshot через Kafka:
{"id":"aurora-stop-001"}:{"type":"stop-snapshot","data":{"data-collections":["inventory.orders"],"type":"INCREMENTAL"}}
Вопрос: Если signal table проблема для read-only replicas, почему нельзя использовать initial snapshot (snapshot.mode=initial)?
Ответ: Initial snapshot тоже требует write access к базе данных (не для data, но для internal state management):
Initial snapshot steps requiring writes:
- Locking phase:
LOCK TABLES(read lock, но требует LOCK privilege) - GTID position recording: Некоторые Debezium versions пытаются записать metadata
- Snapshot marker: Debezium может создавать temporary tables для tracking snapshot state
Aurora read replica restrictions:
LOCK TABLESможет быть blocked (зависит от Aurora version и parameter groups)- Любые DDL statements (CREATE TABLE, даже temporary) — denied
- User с read-only grants не может выполнить LOCK operations
Решение:
- Initial snapshot запускать на writer instance
- Incremental snapshots на read replica через Kafka signal channel
- Или использовать backup-based approach (Phase 14 Lesson 09)
Common Signal Table Pitfalls
Pitfall 1: Signal Table без database.include.list
Scenario:
"database.include.list": "inventory",
"signal.data.collection": "admin.debezium_signal"
Signal table в database admin, которая не в whitelist.
Результат:
- Debezium не capture INSERT events в signal table
- Incremental snapshot никогда не запускается
- Нет error logs (silent failure)
Диагностика:
# Check connector logs для signal parsing
docker compose logs debezium-connect | grep "signal"
# Если нет logs вида:
# "INFO: Signal data collection set to 'admin.debezium_signal'"
# Проблема подтверждена
Решение:
"database.include.list": "inventory,admin"
Pitfall 2: Missing PRIMARY KEY
Scenario:
CREATE TABLE debezium_signal (
id VARCHAR(36),
type VARCHAR(32),
data TEXT
-- ❌ NO PRIMARY KEY
);
Результат:
- Debezium capture INSERT events, но fails to parse (expects PK для binlog event structure)
- Silent failure, no snapshot triggered
Диагностика:
SHOW CREATE TABLE debezium_signal\G
Если output не содержит PRIMARY KEY — проблема подтверждена.
Решение:
ALTER TABLE debezium_signal ADD PRIMARY KEY (id);
Pitfall 3: Binlog Retention < Snapshot Duration
Scenario:
- Incremental snapshot для 500GB table запущен
- Expected duration: 10 часов
- Binlog retention: 7 дней (168 часов)
- Но за время snapshot прошло 8 дней (connector был stopped из-за issue)
Результат:
- Snapshot начался с binlog position
mysql-bin.000120:154 - После 10 часов snapshot завершен
- Debezium пытается resume binlog streaming с position
mysql-bin.000120:154 - Файл
mysql-bin.000120purged (retention exceeded) - Error: “Cannot replicate because master purged required binary logs”
Диагностика:
-- Check binlog retention
SHOW VARIABLES LIKE 'binlog_expire_logs_seconds';
-- Check available binlog files
SHOW BINARY LOGS;
Mitigation:
Formula:
Required Binlog Retention >= Snapshot Duration + Safety Margin
Example:
- Snapshot duration estimate: 12 hours
- Safety margin: 2x (for unexpected delays)
- Required retention: 12 * 2 = 24 hours (минимум)
Рекомендация: 7 days retention (604800 seconds) для production.
Aurora Configuration:
-- В DB Cluster Parameter Group
binlog_expire_logs_seconds = 604800 -- 7 days
Monitor binlog usage during snapshot:
-- Check binlog disk usage
SELECT
SUM(ROUND(file_size / 1024 / 1024, 2)) AS binlog_size_mb
FROM information_schema.innodb_tablespaces
WHERE name LIKE 'mysql-bin%';
Aurora CloudWatch metric:
Metric: ChangeLogBytesUsed
Threshold: < 80% of available storage
Pitfall 4: Read-Only Environment без GTID Prerequisites
Scenario:
- Incremental snapshot configured для Aurora read replica
- Kafka signal channel setup
- Но
replica_preserve_commit_order=OFF
Результат:
- Snapshot запускается
- Debezium читает chunks с replica
- Binlog events arrive out of order на replica
- Chunk N содержит UPDATE для row ID=500
- Но INSERT для row ID=500 еще не реплицирован с writer
- Debezium publishes UPDATE event без соответствующего CREATE event
- Consumer application видит UPDATE для non-existent row → data inconsistency
Диагностика:
SHOW VARIABLES LIKE 'replica_preserve_commit_order';
-- If OFF or not set → problem confirmed
Решение:
-- В Aurora DB Cluster Parameter Group
replica_preserve_commit_order = ON
-- Reboot read replica instances
Verification после reboot:
SHOW VARIABLES LIKE 'replica_preserve_commit_order';
-- Expected: ON
Binlog Retention Planning для Large Table Snapshots
Для очень больших таблиц incremental snapshot может длиться дни. Критически важно гарантировать, что binlog files доступны на протяжении всего snapshot.
Estimating Snapshot Duration
Formula:
Snapshot Duration (hours) = (Total Rows / Chunk Size) × Average Chunk Time (seconds) / 3600
Example:
total_rows = 100_000_000 # 100 million rows
chunk_size = 2048
avg_chunk_time_sec = 2 # 2 seconds per chunk (зависит от network, table width, indexes)
chunks = total_rows / chunk_size # 48,828 chunks
total_time_sec = chunks * avg_chunk_time_sec # 97,656 seconds
total_hours = total_time_sec / 3600 # 27 hours
Рекомендация: Измерить avg_chunk_time для первых 10 chunks из logs, затем extrapolate.
Retention Formula
Required Retention >= Snapshot Duration + Safety Margin + Normal Operations Buffer
Components:
- Snapshot Duration: Estimated hours для incremental snapshot
- Safety Margin: 2x factor для unexpected delays (network issues, connector restarts)
- Normal Operations Buffer: Minimum retention для non-snapshot periods (например, 3 дня)
Example Calculation:
Snapshot Duration = 27 hours
Safety Margin = 27 × 2 = 54 hours
Normal Operations Buffer = 72 hours (3 days)
Total Required Retention = 27 + 54 + 72 = 153 hours ≈ 7 days
Aurora Maximum Retention:
-- Aurora MySQL 3.x supports up to 90 days (2160 hours)
binlog_expire_logs_seconds = 7776000 -- 90 days
-- Но для большинства cases 7 дней достаточно
binlog_expire_logs_seconds = 604800 -- 7 days
Monitoring Binlog Health During Snapshot
CloudWatch Metrics (Aurora):
Metric: ChangeLogBytesUsed
Description: Total size of binlog files
Alert Threshold: > 80% of available storage
Alert Rule:
AlarmName: BinlogStorageHigh
MetricName: ChangeLogBytesUsed
Namespace: AWS/RDS
Threshold: 80000000000 # 80 GB (если total storage 100 GB)
ComparisonOperator: GreaterThanThreshold
EvaluationPeriods: 2
Period: 300 # 5 minutes
Manual Check:
-- Total binlog size
SHOW BINARY LOGS;
-- Sum file_size column manually или:
SELECT
ROUND(SUM(file_size) / 1024 / 1024 / 1024, 2) AS total_gb
FROM (
SELECT file_size FROM information_schema.binlog_files
) AS binlog_summary;
If binlog storage approaching limit:
- Pause incremental snapshot (stop-snapshot signal)
- Increase retention (если позволяет storage)
- Resize Aurora storage (если retention нельзя увеличить)
- Resume snapshot после stabilization
Hands-On Exercises
Exercise 1: Setup Signal Table и Trigger Basic Snapshot
Goal: Создать signal table, настроить connector, trigger incremental snapshot для одной таблицы.
Steps:
-
Создать signal table в MySQL:
CREATE TABLE inventory.debezium_signal ( id VARCHAR(36) NOT NULL, type VARCHAR(32) NOT NULL, data TEXT, PRIMARY KEY (id) ) ENGINE=InnoDB; GRANT INSERT, UPDATE, DELETE ON inventory.debezium_signal TO 'debezium'@'%'; FLUSH PRIVILEGES; -
Обновить connector config:
"signal.data.collection": "inventory.debezium_signal", "incremental.snapshot.chunk.size": "2048" -
Redeploy connector:
curl -X PUT http://localhost:8083/connectors/mysql-inventory-connector/config \ -H "Content-Type: application/json" \ -d @updated-connector-config.json -
Trigger snapshot:
INSERT INTO inventory.debezium_signal (id, type, data) VALUES (UUID(), 'execute-snapshot', '{"data-collections": ["inventory.products"]}'); -
Monitor logs:
docker compose logs -f debezium-connect | grep -i "incremental"
Expected outcome:
- Connector logs show “Incremental snapshot requested”
- Kafka topic
mysql-server.inventory.productsreceives snapshot events - After completion: “Incremental snapshot completed”
Exercise 2: Monitor Snapshot Progress
Goal: Track incremental snapshot через connector logs и estimate completion time.
Steps:
-
Trigger snapshot для large table:
INSERT INTO inventory.debezium_signal (id, type, data) VALUES (UUID(), 'execute-snapshot', '{"data-collections": ["inventory.orders"]}'); -
Extract chunk timing:
docker compose logs debezium-connect | grep "Snapshot window" | tail -20 -
Calculate average chunk time:
# Measure time between consecutive chunks from logs # Example: Chunk 1 at 10:00:00, Chunk 2 at 10:00:02 → 2 seconds avg_chunk_time = 2 -
Estimate remaining time:
total_rows = 500000 # From SHOW TABLE STATUS chunk_size = 2048 current_chunk = 50 remaining_chunks = (total_rows / chunk_size) - current_chunk estimated_sec = remaining_chunks * avg_chunk_time estimated_hours = estimated_sec / 3600 print(f"Estimated completion: {estimated_hours:.2f} hours")
Exercise 3: Stop In-Progress Snapshot
Goal: Cancel incremental snapshot и verify clean stop.
Steps:
-
Start long-running snapshot:
INSERT INTO inventory.debezium_signal (id, type, data) VALUES (UUID(), 'execute-snapshot', '{"data-collections": ["inventory.orders"]}'); -
After 1-2 minutes, send stop signal:
INSERT INTO inventory.debezium_signal (id, type, data) VALUES (UUID(), 'stop-snapshot', '{"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}'); -
Verify stop:
docker compose logs debezium-connect | grep -E "(stop|abort|cancel)"
Expected outcome:
- Connector logs show “Incremental snapshot stopped”
- Current chunk completes before stopping
- Connector resumes normal binlog streaming
Exercise 4 (Advanced): Configure Kafka Signal Channel для Read-Only Scenario
Goal: Setup Kafka-based signaling for Aurora read replica CDC.
Steps:
-
Create Kafka signal topic:
docker compose exec kafka kafka-topics \ --bootstrap-server localhost:9092 \ --create \ --topic debezium-signal-mysql \ --partitions 1 \ --replication-factor 1 -
Update connector config:
"signal.kafka.topic": "debezium-signal-mysql", "signal.kafka.bootstrap.servers": "kafka:9092", "signal.kafka.groupId": "debezium-signal-consumer" -
Trigger snapshot via Kafka:
docker compose exec kafka kafka-console-producer \ --bootstrap-server localhost:9092 \ --topic debezium-signal-mysql \ --property "parse.key=true" \ --property "key.separator=:" # Input: {"id":"test-001"}:{"type":"execute-snapshot","data":{"data-collections":["inventory.products"]}} -
Verify signal consumption:
docker compose logs debezium-connect | grep "Incremental snapshot requested"
Key Takeaways
1. Incremental snapshot решает ограничения initial snapshot
- On-demand snapshot без connector restart
- Chunk-based reading (resumable, parallel с binlog streaming)
- Можно snapshot specific tables (не all-or-nothing)
2. Signal table требует строгую структуру
- PRIMARY KEY на
idобязателен (silent failure без PK) - Schema должна быть в
database.include.list - GRANT INSERT права для debezium user
3. Connector configuration: signal.data.collection
- Fully-qualified name:
{database}.{table} incremental.snapshot.chunk.sizeпо умолчанию 2048 (tune для performance)- Signal table schema ДОЛЖНА быть в database whitelist
4. Triggering snapshot через SQL INSERT
- Basic:
{"data-collections": ["table"]} - Multiple tables: JSON array
- Filtered:
additional-conditionsс SQL WHERE clause - Stop:
type: "stop-snapshot"
5. Read-only snapshots через Kafka signal channel
- Альтернатива signal table для Aurora read replicas
- Требует GTID prerequisites:
replica_preserve_commit_order=ON - Signals sent via
kafka-console-producer signal.kafka.topicвместоsignal.data.collection
6. Monitoring через JMX metrics и logs
SnapshotRunning,RemainingTableCount(JMX)- Connector logs: chunk windows, scanned rows
- Estimate completion time:
(total_rows / chunk_size) × avg_chunk_time
7. Common pitfalls prevention
- Signal table schema в
database.include.list(иначе silent failure) - PRIMARY KEY обязателен (no PK = no signals processed)
- Binlog retention >= snapshot duration + safety margin
- Read-only environments: GTID prerequisites critical
8. Binlog retention planning для large snapshots
- Formula:
Snapshot Duration + 2× Safety Margin + 3-day buffer - Рекомендация: 7 days minimum (604800 seconds)
- Monitor
ChangeLogBytesUsed(Aurora CloudWatch)
Что дальше?
В следующем уроке “MySQL/Aurora Production Troubleshooting” мы изучим:
- Debugging failed snapshots (initial и incremental)
- Recovery strategies после binlog purge
- Schema evolution во время snapshot
- Performance tuning для large-table snapshots
- Multi-region CDC challenges (Aurora Global Database)
Модуль 8 roadmap:
- ✅ Lesson 1-9: MySQL binlog, GTID, Aurora parameter groups, snapshot modes
- ✅ Lesson 12: Incremental snapshots ← Вы здесь
- 🔜 Lesson 13: Production troubleshooting
- 🔜 Lesson 14: MySQL/Aurora decision matrix
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress