Disaster Recovery: Процедуры восстановления CDC
CDC pipeline — это stateful система. Состояние хранится в нескольких местах: offsets в Kafka, replication slot в PostgreSQL. Потеря любой части может привести к дублям или потере данных. В этом уроке мы изучим что может сломаться, как подготовиться и как восстанавливаться.
Production Insight: Untested DR is no DR. Если вы не практикуете восстановление регулярно, вы узнаете о проблемах в самый неподходящий момент.
Что может пойти не так
Severity Matrix
| Сценарий | Вероятность | Влияние | Подготовка |
|---|---|---|---|
| Connect worker restart | Высокая | Низкое | Автоматическое восстановление |
| Connector task failure | Средняя | Низкое | Restart task |
| Connector deletion | Низкая | Среднее | Backup offsets + slot cleanup |
| Database failover | Низкая | Высокое | Heartbeat + incremental snapshot |
| Kafka cluster loss | Очень низкая | Критическое | Offsets backup + full resnapshot |
Понимание хранения состояния
Где хранится состояние
Состояние хранится в нескольких местах - recovery требует координации
connect-offsets Topic
Это критически важный топик. Содержит текущую позицию каждого коннектора.
Структура записи:
{
"key": ["inventory-connector", {"server": "inventory"}],
"value": {
"lsn": 12345678,
"txId": 1234,
"ts_usec": 1706745600000000
}
}
Ключ: Идентификатор коннектора + server Значение: LSN позиция в PostgreSQL WAL
Проверка знанийПочему при одновременном сбое Kafka и коннектора Debezium необходимо восстановить Kafka первым?
Backup Процедуры
Option 1: kafka-console-consumer (Простой)
#!/bin/bash
# offset-backup.sh - Backup Debezium offsets
KAFKA_BOOTSTRAP="localhost:9092"
OFFSETS_TOPIC="connect-offsets"
BACKUP_DIR="/backups/debezium"
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/offsets-${DATE}.txt"
# Создать директорию
mkdir -p $BACKUP_DIR
# Backup всех offsets
docker exec kafka kafka-console-consumer \
--bootstrap-server $KAFKA_BOOTSTRAP \
--topic $OFFSETS_TOPIC \
--from-beginning \
--property print.key=true \
--property print.timestamp=true \
--property key.separator=":::" \
--timeout-ms 5000 \
> $BACKUP_FILE 2>/dev/null
# Проверить результат
LINES=$(wc -l < $BACKUP_FILE)
echo "Backed up $LINES offset records to $BACKUP_FILE"
# Сжать старые backup (старше 7 дней)
find $BACKUP_DIR -name "offsets-*.txt" -mtime +7 -exec gzip {} \;
Расписание (crontab):
# Каждые 6 часов
0 */6 * * * /scripts/offset-backup.sh >> /var/log/offset-backup.log 2>&1
Option 2: Kafka Connect REST API (Kafka 3.6+)
Начиная с Kafka Connect 3.6, доступен REST API для управления offsets.
Получить текущие offsets:
curl -X GET http://localhost:8083/connectors/inventory-connector/offsets | jq
Пример ответа:
{
"offsets": [
{
"partition": {
"server": "inventory"
},
"offset": {
"lsn": 12345678,
"txId": 1234,
"ts_usec": 1706745600000000
}
}
]
}
Backup script с REST API:
#!/bin/bash
# offset-backup-rest.sh - Backup using REST API
CONNECT_URL="http://localhost:8083"
BACKUP_DIR="/backups/debezium"
DATE=$(date +%Y%m%d_%H%M%S)
mkdir -p $BACKUP_DIR
# Получить список коннекторов
CONNECTORS=$(curl -s $CONNECT_URL/connectors | jq -r '.[]')
# Backup каждого коннектора
for CONNECTOR in $CONNECTORS; do
BACKUP_FILE="${BACKUP_DIR}/${CONNECTOR}-offsets-${DATE}.json"
curl -s "$CONNECT_URL/connectors/$CONNECTOR/offsets" > $BACKUP_FILE
echo "Backed up $CONNECTOR to $BACKUP_FILE"
done
Restore Процедуры
Сценарий 1: Kafka Connect Restart
Что происходит: Worker перезапустился, connector автоматически восстанавливается.
Действия: Никаких. Automatic recovery.
Проверка:
# Проверить статус после restart
curl -s http://localhost:8083/connectors/inventory-connector/status | jq
Сценарий 2: Connector Deletion (Accidental)
Что происходит: Кто-то удалил коннектор через REST API. Offsets остались в connect-offsets.
Действия:
# 1. Проверить, что offsets существуют
docker exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic connect-offsets \
--from-beginning \
--timeout-ms 5000 2>/dev/null | grep inventory-connector
# 2. Пересоздать коннектор с ТОЙ ЖЕ конфигурацией
# Если имя и topic.prefix совпадают - offsets подхватятся автоматически
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
# 3. Проверить восстановление
curl -s http://localhost:8083/connectors/inventory-connector/status | jq
Важно: Имя коннектора и topic.prefix должны совпадать с оригиналом!
Сценарий 3: Modify Offsets (Reset Position)
Когда нужно: Пропустить problematic data, начать с определенной позиции.
Процедура (Kafka Connect 3.6+):
# 1. ОБЯЗАТЕЛЬНО остановить коннектор
curl -X PUT http://localhost:8083/connectors/inventory-connector/pause
# 2. Дождаться остановки
sleep 5
curl -s http://localhost:8083/connectors/inventory-connector/status | jq '.connector.state'
# Должно быть "PAUSED"
# 3. Модифицировать offsets
curl -X PATCH http://localhost:8083/connectors/inventory-connector/offsets \
-H "Content-Type: application/json" \
-d '{
"offsets": [{
"partition": {"server": "inventory"},
"offset": {"lsn": 99999999}
}]
}'
# 4. Возобновить коннектор
curl -X PUT http://localhost:8083/connectors/inventory-connector/resume
# 5. Проверить новую позицию
curl -s http://localhost:8083/connectors/inventory-connector/offsets | jq
Warning: Изменение offset на будущую позицию = потеря данных. На прошлую = дубли.
Сценарий 4: Slot Invalidated (wal_status = ‘lost’)
Что происходит: max_slot_wal_keep_size превышен, slot инвалидирован.
Симптомы:
SELECT slot_name, wal_status FROM pg_replication_slots;
-- wal_status = 'lost'
Процедура:
# 1. Коннектор уже не работает - проверить логи
docker logs connect 2>&1 | grep -i "slot"
# 2. Удалить инвалидированный slot
docker exec -it postgres psql -U postgres -d inventory -c "
SELECT pg_drop_replication_slot('debezium_inventory');
"
# 3. Удалить коннектор
curl -X DELETE http://localhost:8083/connectors/inventory-connector
# 4. Подождать cleanup
sleep 10
# 5. Пересоздать коннектор - выполнит ПОЛНЫЙ snapshot
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
# 6. Мониторить snapshot progress
watch -n 5 'curl -s http://localhost:8083/connectors/inventory-connector/status | jq'
Data Loss: События между последним offset и моментом invalidation потеряны. Full resnapshot восстановит текущее состояние, но не историю изменений.
Orphaned Slot Cleanup
Обнаружение orphaned slots
-- Найти slots без активного коннектора
SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
wal_status
FROM pg_replication_slots
WHERE slot_type = 'logical'
AND active = false;
Безопасный cleanup
SELECT slot_name, active, wal_status,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots WHERE slot_type='logical';Cleanup script:
#!/bin/bash
# orphaned-slot-cleanup.sh
CONNECT_URL="http://localhost:8083"
PG_CONTAINER="postgres"
DB="inventory"
# Получить все slots
SLOTS=$(docker exec $PG_CONTAINER psql -U postgres -d $DB -t -c "
SELECT slot_name FROM pg_replication_slots
WHERE slot_type = 'logical' AND active = false;
")
# Получить все коннекторы
CONNECTORS=$(curl -s $CONNECT_URL/connectors | jq -r '.[]')
for SLOT in $SLOTS; do
SLOT=$(echo $SLOT | tr -d ' ')
[ -z "$SLOT" ] && continue
# Проверить, есть ли коннектор для этого slot
FOUND=false
for CONN in $CONNECTORS; do
# Получить slot.name коннектора
CONN_SLOT=$(curl -s "$CONNECT_URL/connectors/$CONN/config" | jq -r '.["slot.name"] // empty')
if [ "$CONN_SLOT" = "$SLOT" ]; then
FOUND=true
break
fi
done
if [ "$FOUND" = false ]; then
echo "ORPHANED: $SLOT (no connector found)"
# Не удаляем автоматически - требуется подтверждение
else
echo "ACTIVE: $SLOT (owned by $CONN)"
fi
done
Проверка знанийПри случайном удалении коннектора через REST API, почему его можно восстановить без full resnapshot, пересоздав с тем же именем и topic.prefix?
DR Drill Procedure
Регулярная практика DR — единственный способ убедиться, что процедуры работают.
Quarterly DR Drill Checklist
## Pre-Drill Preparation
- [ ] Schedule maintenance window (1-2 hours)
- [ ] Notify downstream consumers
- [ ] Backup current offsets
- [ ] Document current connector positions
## Drill Steps
### Step 1: Verify Backup (10 min)
- [ ] Run offset backup script
- [ ] Verify backup file is readable
- [ ] Parse and validate offset structure
### Step 2: Simulate Connector Loss (15 min)
- [ ] Delete connector via REST API
- [ ] Verify offsets still in connect-offsets
- [ ] Recreate connector with same config
- [ ] Verify recovery from saved position
### Step 3: Simulate Slot Loss (20 min)
- [ ] Stop connector
- [ ] Drop replication slot manually
- [ ] Restart connector
- [ ] Verify snapshot starts
- [ ] Monitor snapshot completion
### Step 4: Test Offset Modification (15 min)
- [ ] Pause connector
- [ ] Modify offset via REST API
- [ ] Resume connector
- [ ] Verify position changed
## Post-Drill
- [ ] Document any issues found
- [ ] Update runbooks if needed
- [ ] Restore production state
- [ ] Schedule next drill
Lab: Execute DR Drill
Цель: Практика backup и restore процедур.
1. Backup текущих offsets:
# Создать backup
docker exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic connect-offsets \
--from-beginning \
--timeout-ms 5000 \
--property print.key=true \
> /tmp/offsets-backup.txt
cat /tmp/offsets-backup.txt
2. Удалить коннектор:
# Сохранить конфигурацию
curl -s http://localhost:8083/connectors/inventory-connector/config > /tmp/connector-config.json
# Удалить коннектор
curl -X DELETE http://localhost:8083/connectors/inventory-connector
# Проверить удаление
curl -s http://localhost:8083/connectors
3. Восстановить коннектор:
# Пересоздать с той же конфигурацией
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d "{
\"name\": \"inventory-connector\",
\"config\": $(cat /tmp/connector-config.json)
}"
# Проверить восстановление
curl -s http://localhost:8083/connectors/inventory-connector/status | jq
4. Проверить продолжение с правильной позиции:
# Сделать INSERT в БД
docker exec -it postgres psql -U postgres -d inventory -c "
INSERT INTO orders (customer_id, order_date, status)
VALUES (1, NOW(), 'new');
"
# Проверить событие в Kafka
docker exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic inventory.public.orders \
--from-beginning \
--max-messages 5
Production Runbook Template
# CDC Disaster Recovery Runbook
## Overview
- **System:** Debezium CDC Pipeline
- **Components:** Kafka Connect, PostgreSQL, Kafka
- **RTO:** 30 minutes
- **RPO:** Last committed offset
## Contact Information
- **Primary On-Call:** [name] [phone]
- **Secondary On-Call:** [name] [phone]
- **Database Team:** [name] [phone]
## Quick Reference
### Check Connector Status
curl -s http://localhost:8083/connectors/inventory-connector/status | jq
### Check Slot Status
docker exec postgres psql -U postgres -d inventory -c "
SELECT slot_name, active, wal_status,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots WHERE slot_type='logical';
"
### Restart Failed Task
curl -X POST http://localhost:8083/connectors/inventory-connector/tasks/0/restart
## Scenarios
### Scenario A: Connector FAILED
1. Check logs: docker logs connect 2>&1 | tail -100
2. Identify error type
3. If transient: Restart task
4. If config issue: Fix config, recreate connector
### Scenario B: Slot Invalidated
1. Confirm: wal_status = 'lost'
2. Drop slot
3. Delete connector
4. Recreate connector (will do full snapshot)
5. Monitor snapshot progress
### Scenario C: Kafka Cluster Lost
1. Restore Kafka from backup
2. Restore connect-offsets topic
3. Restart Kafka Connect
4. Connectors will resume from restored offsets
## Recovery Verification
- [ ] Connector status: RUNNING
- [ ] Slot status: active = true
- [ ] New events flowing to Kafka
- [ ] Consumer lag decreasing
Ключевые выводы
-
Состояние хранится в двух местах: connect-offsets (Kafka) и replication slot (PostgreSQL)
-
Backup offsets регулярно: kafka-console-consumer или REST API (Kafka 3.6+)
-
При удалении коннектора: Offsets сохраняются, можно восстановить
-
При invalidation slot: Потеря данных неизбежна, требуется full resnapshot
-
Orphaned slots опасны: Могут заполнить диск, но не удаляйте без подтверждения
-
DR drill обязателен: Практикуйте восстановление ежеквартально
-
REST API offsets (3.6+): Позволяет читать и модифицировать offsets напрямую
Production Insight: Самая частая ошибка при DR — не проверять, что backup работает. Второе: изменять offsets на stopped коннекторе и забывать его resume. Третье: удалять slot, который принадлежит paused коннектору.
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress