Перейти к содержанию
Learning Platform
Продвинутый
40 минут
disaster-recovery offsets backup runbooks

Disaster Recovery: Процедуры восстановления CDC

CDC pipeline — это stateful система. Состояние хранится в нескольких местах: offsets в Kafka, replication slot в PostgreSQL. Потеря любой части может привести к дублям или потере данных. В этом уроке мы изучим что может сломаться, как подготовиться и как восстанавливаться.

Production Insight: Untested DR is no DR. Если вы не практикуете восстановление регулярно, вы узнаете о проблемах в самый неподходящий момент.

Что может пойти не так

Возможные отказы и их последствия
Connect Crash
Worker restart
Connector Delete
Accidental
DB Failover
Slot lost
Kafka Loss
Cluster failure
Auto Recovery
From last offset
Slot Cleanup
Remove orphaned
Data Loss
Gap in history
Full Resnapshot
Hours/days
Severity Matrix
Сценарий
Вероятность
Влияние
Connect restart
Высокая
Низкое
Connector deletion
Низкая
Среднее
DB failover
Низкая
Высокое
Kafka loss
Очень низкая
Критическое

Severity Matrix

СценарийВероятностьВлияниеПодготовка
Connect worker restartВысокаяНизкоеАвтоматическое восстановление
Connector task failureСредняяНизкоеRestart task
Connector deletionНизкаяСреднееBackup offsets + slot cleanup
Database failoverНизкаяВысокоеHeartbeat + incremental snapshot
Kafka cluster lossОчень низкаяКритическоеOffsets backup + full resnapshot

Понимание хранения состояния

Где хранится состояние

Распределение состояния CDC

Состояние хранится в нескольких местах - recovery требует координации

Kafka Cluster
connect-configs
Configurations
connect-offsets
LSN positions
connect-statuses
Task states
PostgreSQL
Replication Slot
restart_lsn position
WAL Segments
Retained for slot
Kafka Connect
Worker Process
Task distribution
Running Tasks
In-memory
Recovery требует координации всех трех компонентов

connect-offsets Topic

Это критически важный топик. Содержит текущую позицию каждого коннектора.

Структура записи:

{
  "key": ["inventory-connector", {"server": "inventory"}],
  "value": {
    "lsn": 12345678,
    "txId": 1234,
    "ts_usec": 1706745600000000
  }
}

Ключ: Идентификатор коннектора + server Значение: LSN позиция в PostgreSQL WAL

Проверка знаний
Почему при одновременном сбое Kafka и коннектора Debezium необходимо восстановить Kafka первым?
Ответ
При запуске коннектор обращается к connect-offsets топику в Kafka для определения последней позиции (LSN). Без доступного Kafka коннектор не сможет определить, откуда продолжить чтение — он либо начнет с начала (дубли), либо упадет с ошибкой.

Backup Процедуры

Option 1: kafka-console-consumer (Простой)

#!/bin/bash
# offset-backup.sh - Backup Debezium offsets

KAFKA_BOOTSTRAP="localhost:9092"
OFFSETS_TOPIC="connect-offsets"
BACKUP_DIR="/backups/debezium"
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/offsets-${DATE}.txt"

# Создать директорию
mkdir -p $BACKUP_DIR

# Backup всех offsets
docker exec kafka kafka-console-consumer \
  --bootstrap-server $KAFKA_BOOTSTRAP \
  --topic $OFFSETS_TOPIC \
  --from-beginning \
  --property print.key=true \
  --property print.timestamp=true \
  --property key.separator=":::" \
  --timeout-ms 5000 \
  > $BACKUP_FILE 2>/dev/null

# Проверить результат
LINES=$(wc -l < $BACKUP_FILE)
echo "Backed up $LINES offset records to $BACKUP_FILE"

# Сжать старые backup (старше 7 дней)
find $BACKUP_DIR -name "offsets-*.txt" -mtime +7 -exec gzip {} \;

Расписание (crontab):

# Каждые 6 часов
0 */6 * * * /scripts/offset-backup.sh >> /var/log/offset-backup.log 2>&1

Option 2: Kafka Connect REST API (Kafka 3.6+)

Начиная с Kafka Connect 3.6, доступен REST API для управления offsets.

Получить текущие offsets:

curl -X GET http://localhost:8083/connectors/inventory-connector/offsets | jq

Пример ответа:

{
  "offsets": [
    {
      "partition": {
        "server": "inventory"
      },
      "offset": {
        "lsn": 12345678,
        "txId": 1234,
        "ts_usec": 1706745600000000
      }
    }
  ]
}

Backup script с REST API:

#!/bin/bash
# offset-backup-rest.sh - Backup using REST API

CONNECT_URL="http://localhost:8083"
BACKUP_DIR="/backups/debezium"
DATE=$(date +%Y%m%d_%H%M%S)

mkdir -p $BACKUP_DIR

# Получить список коннекторов
CONNECTORS=$(curl -s $CONNECT_URL/connectors | jq -r '.[]')

# Backup каждого коннектора
for CONNECTOR in $CONNECTORS; do
  BACKUP_FILE="${BACKUP_DIR}/${CONNECTOR}-offsets-${DATE}.json"
  curl -s "$CONNECT_URL/connectors/$CONNECTOR/offsets" > $BACKUP_FILE
  echo "Backed up $CONNECTOR to $BACKUP_FILE"
done

Restore Процедуры

Сценарий 1: Kafka Connect Restart

Что происходит: Worker перезапустился, connector автоматически восстанавливается.

Действия: Никаких. Automatic recovery.

Проверка:

# Проверить статус после restart
curl -s http://localhost:8083/connectors/inventory-connector/status | jq

Сценарий 2: Connector Deletion (Accidental)

Что происходит: Кто-то удалил коннектор через REST API. Offsets остались в connect-offsets.

Действия:

# 1. Проверить, что offsets существуют
docker exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic connect-offsets \
  --from-beginning \
  --timeout-ms 5000 2>/dev/null | grep inventory-connector

# 2. Пересоздать коннектор с ТОЙ ЖЕ конфигурацией
# Если имя и topic.prefix совпадают - offsets подхватятся автоматически
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @connector-config.json

# 3. Проверить восстановление
curl -s http://localhost:8083/connectors/inventory-connector/status | jq

Важно: Имя коннектора и topic.prefix должны совпадать с оригиналом!

Сценарий 3: Modify Offsets (Reset Position)

Когда нужно: Пропустить problematic data, начать с определенной позиции.

Процедура (Kafka Connect 3.6+):

# 1. ОБЯЗАТЕЛЬНО остановить коннектор
curl -X PUT http://localhost:8083/connectors/inventory-connector/pause

# 2. Дождаться остановки
sleep 5
curl -s http://localhost:8083/connectors/inventory-connector/status | jq '.connector.state'
# Должно быть "PAUSED"

# 3. Модифицировать offsets
curl -X PATCH http://localhost:8083/connectors/inventory-connector/offsets \
  -H "Content-Type: application/json" \
  -d '{
    "offsets": [{
      "partition": {"server": "inventory"},
      "offset": {"lsn": 99999999}
    }]
  }'

# 4. Возобновить коннектор
curl -X PUT http://localhost:8083/connectors/inventory-connector/resume

# 5. Проверить новую позицию
curl -s http://localhost:8083/connectors/inventory-connector/offsets | jq

Warning: Изменение offset на будущую позицию = потеря данных. На прошлую = дубли.

Сценарий 4: Slot Invalidated (wal_status = ‘lost’)

Что происходит: max_slot_wal_keep_size превышен, slot инвалидирован.

Симптомы:

SELECT slot_name, wal_status FROM pg_replication_slots;
-- wal_status = 'lost'

Процедура:

# 1. Коннектор уже не работает - проверить логи
docker logs connect 2>&1 | grep -i "slot"

# 2. Удалить инвалидированный slot
docker exec -it postgres psql -U postgres -d inventory -c "
SELECT pg_drop_replication_slot('debezium_inventory');
"

# 3. Удалить коннектор
curl -X DELETE http://localhost:8083/connectors/inventory-connector

# 4. Подождать cleanup
sleep 10

# 5. Пересоздать коннектор - выполнит ПОЛНЫЙ snapshot
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @connector-config.json

# 6. Мониторить snapshot progress
watch -n 5 'curl -s http://localhost:8083/connectors/inventory-connector/status | jq'

Data Loss: События между последним offset и моментом invalidation потеряны. Full resnapshot восстановит текущее состояние, но не историю изменений.

Orphaned Slot Cleanup

Обнаружение orphaned slots

-- Найти slots без активного коннектора
SELECT
    slot_name,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal,
    wal_status
FROM pg_replication_slots
WHERE slot_type = 'logical'
  AND active = false;

Безопасный cleanup

Безопасный cleanup orphaned slots
Найден inactive slot
active = false
Коннектор существует?
Да
Коннектор PAUSED?
Да
WAIT
Дождаться resume
Нет
CHECK LOGS
Investigate
Нет
Документация
подтверждает удаление?
Да
DROP SLOT
Безопасно
Нет
ESCALATE
К владельцу
wal_status = 'lost' = точка невозврата
Full resnapshot неизбежен
Проверка статуса slots:
SELECT slot_name, active, wal_status,
  pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots WHERE slot_type='logical';

Cleanup script:

#!/bin/bash
# orphaned-slot-cleanup.sh

CONNECT_URL="http://localhost:8083"
PG_CONTAINER="postgres"
DB="inventory"

# Получить все slots
SLOTS=$(docker exec $PG_CONTAINER psql -U postgres -d $DB -t -c "
SELECT slot_name FROM pg_replication_slots
WHERE slot_type = 'logical' AND active = false;
")

# Получить все коннекторы
CONNECTORS=$(curl -s $CONNECT_URL/connectors | jq -r '.[]')

for SLOT in $SLOTS; do
  SLOT=$(echo $SLOT | tr -d ' ')
  [ -z "$SLOT" ] && continue

  # Проверить, есть ли коннектор для этого slot
  FOUND=false
  for CONN in $CONNECTORS; do
    # Получить slot.name коннектора
    CONN_SLOT=$(curl -s "$CONNECT_URL/connectors/$CONN/config" | jq -r '.["slot.name"] // empty')
    if [ "$CONN_SLOT" = "$SLOT" ]; then
      FOUND=true
      break
    fi
  done

  if [ "$FOUND" = false ]; then
    echo "ORPHANED: $SLOT (no connector found)"
    # Не удаляем автоматически - требуется подтверждение
  else
    echo "ACTIVE: $SLOT (owned by $CONN)"
  fi
done
Проверка знаний
При случайном удалении коннектора через REST API, почему его можно восстановить без full resnapshot, пересоздав с тем же именем и topic.prefix?
Ответ
Offsets хранятся в Kafka топике connect-offsets и привязаны к имени коннектора и topic.prefix. При удалении коннектора offsets НЕ удаляются. Пересоздание коннектора с идентичным именем и конфигурацией автоматически подхватит сохраненные offsets и продолжит с последней позиции.

DR Drill Procedure

Регулярная практика DR — единственный способ убедиться, что процедуры работают.

Quarterly DR Drill Checklist

## Pre-Drill Preparation

- [ ] Schedule maintenance window (1-2 hours)
- [ ] Notify downstream consumers
- [ ] Backup current offsets
- [ ] Document current connector positions

## Drill Steps

### Step 1: Verify Backup (10 min)
- [ ] Run offset backup script
- [ ] Verify backup file is readable
- [ ] Parse and validate offset structure

### Step 2: Simulate Connector Loss (15 min)
- [ ] Delete connector via REST API
- [ ] Verify offsets still in connect-offsets
- [ ] Recreate connector with same config
- [ ] Verify recovery from saved position

### Step 3: Simulate Slot Loss (20 min)
- [ ] Stop connector
- [ ] Drop replication slot manually
- [ ] Restart connector
- [ ] Verify snapshot starts
- [ ] Monitor snapshot completion

### Step 4: Test Offset Modification (15 min)
- [ ] Pause connector
- [ ] Modify offset via REST API
- [ ] Resume connector
- [ ] Verify position changed

## Post-Drill

- [ ] Document any issues found
- [ ] Update runbooks if needed
- [ ] Restore production state
- [ ] Schedule next drill

Lab: Execute DR Drill

Цель: Практика backup и restore процедур.

1. Backup текущих offsets:

# Создать backup
docker exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic connect-offsets \
  --from-beginning \
  --timeout-ms 5000 \
  --property print.key=true \
  > /tmp/offsets-backup.txt

cat /tmp/offsets-backup.txt

2. Удалить коннектор:

# Сохранить конфигурацию
curl -s http://localhost:8083/connectors/inventory-connector/config > /tmp/connector-config.json

# Удалить коннектор
curl -X DELETE http://localhost:8083/connectors/inventory-connector

# Проверить удаление
curl -s http://localhost:8083/connectors

3. Восстановить коннектор:

# Пересоздать с той же конфигурацией
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d "{
    \"name\": \"inventory-connector\",
    \"config\": $(cat /tmp/connector-config.json)
  }"

# Проверить восстановление
curl -s http://localhost:8083/connectors/inventory-connector/status | jq

4. Проверить продолжение с правильной позиции:

# Сделать INSERT в БД
docker exec -it postgres psql -U postgres -d inventory -c "
INSERT INTO orders (customer_id, order_date, status)
VALUES (1, NOW(), 'new');
"

# Проверить событие в Kafka
docker exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic inventory.public.orders \
  --from-beginning \
  --max-messages 5

Production Runbook Template

# CDC Disaster Recovery Runbook

## Overview
- **System:** Debezium CDC Pipeline
- **Components:** Kafka Connect, PostgreSQL, Kafka
- **RTO:** 30 minutes
- **RPO:** Last committed offset

## Contact Information
- **Primary On-Call:** [name] [phone]
- **Secondary On-Call:** [name] [phone]
- **Database Team:** [name] [phone]

## Quick Reference

### Check Connector Status
curl -s http://localhost:8083/connectors/inventory-connector/status | jq

### Check Slot Status
docker exec postgres psql -U postgres -d inventory -c "
SELECT slot_name, active, wal_status,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots WHERE slot_type='logical';
"

### Restart Failed Task
curl -X POST http://localhost:8083/connectors/inventory-connector/tasks/0/restart

## Scenarios

### Scenario A: Connector FAILED
1. Check logs: docker logs connect 2>&1 | tail -100
2. Identify error type
3. If transient: Restart task
4. If config issue: Fix config, recreate connector

### Scenario B: Slot Invalidated
1. Confirm: wal_status = 'lost'
2. Drop slot
3. Delete connector
4. Recreate connector (will do full snapshot)
5. Monitor snapshot progress

### Scenario C: Kafka Cluster Lost
1. Restore Kafka from backup
2. Restore connect-offsets topic
3. Restart Kafka Connect
4. Connectors will resume from restored offsets

## Recovery Verification
- [ ] Connector status: RUNNING
- [ ] Slot status: active = true
- [ ] New events flowing to Kafka
- [ ] Consumer lag decreasing

Ключевые выводы

  1. Состояние хранится в двух местах: connect-offsets (Kafka) и replication slot (PostgreSQL)

  2. Backup offsets регулярно: kafka-console-consumer или REST API (Kafka 3.6+)

  3. При удалении коннектора: Offsets сохраняются, можно восстановить

  4. При invalidation slot: Потеря данных неизбежна, требуется full resnapshot

  5. Orphaned slots опасны: Могут заполнить диск, но не удаляйте без подтверждения

  6. DR drill обязателен: Практикуйте восстановление ежеквартально

  7. REST API offsets (3.6+): Позволяет читать и модифицировать offsets напрямую

Production Insight: Самая частая ошибка при DR — не проверять, что backup работает. Второе: изменять offsets на stopped коннекторе и забывать его resume. Третье: удалять slot, который принадлежит paused коннектору.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Расставьте следующие failure modes CDC pipeline по возрастанию severity: (1) перезапуск Kafka Connect worker, (2) потеря Kafka кластера, (3) failover базы данных, (4) corruption connect-offsets топика. Какой порядок правильный?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс