Требуемые знания:
- module-2/06-snapshot-strategies
Практика: Incremental Snapshot
В этой лабораторной работе мы настроим и выполним incremental snapshot для большой таблицы, используя signaling table для управления процессом и Python consumer для мониторинга.
Цели лабораторной работы
- Создать signaling table для управления incremental snapshots
- Сгенерировать тестовую таблицу с 10,000+ записями
- Развернуть коннектор с поддержкой incremental snapshots
- Запустить snapshot через INSERT в signaling table
- Мониторить процесс snapshot с помощью Python consumer
- Продемонстрировать stop/resume возможности
Предварительные требования
Убедитесь, что лабораторное окружение из Module 1 запущено:
cd labs/
docker compose ps
Все сервисы должны быть в статусе healthy или running.
Архитектура лабораторной работы
10,000 rows
Шаг 1: Создание signaling table
Signaling table — это специальная таблица, через которую можно отправлять команды коннектору.
docker compose exec postgres psql -U postgres -d inventory -c "
-- Создание signaling table
CREATE TABLE IF NOT EXISTS public.debezium_signal (
id VARCHAR(100) PRIMARY KEY,
type VARCHAR(50) NOT NULL,
data TEXT NOT NULL
);
-- Права на таблицу
GRANT INSERT, UPDATE, DELETE ON public.debezium_signal TO postgres;
-- Проверка
\\d public.debezium_signal
"
Структура signaling table:
| Колонка | Тип | Назначение |
|---|---|---|
id | VARCHAR(100) | Уникальный идентификатор сигнала |
type | VARCHAR(50) | Тип сигнала: execute-snapshot, stop-snapshot |
data | TEXT | JSON с параметрами сигнала |
Проверка знанийПочему Debezium использует для управления snapshot таблицу в базе данных, а не REST API Kafka Connect?
Шаг 2: Создание тестовой таблицы
Создадим таблицу orders_large с 10,000 записями для демонстрации chunked снэпшота:
docker compose exec postgres psql -U postgres -d inventory -c "
-- Создание таблицы orders_large
CREATE TABLE IF NOT EXISTS public.orders_large (
id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL,
order_date DATE NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Генерация 10,000 записей
INSERT INTO public.orders_large (customer_id, order_date, total_amount, status, notes)
SELECT
(random() * 1000)::INTEGER + 1 AS customer_id,
'2024-01-01'::DATE + (random() * 365)::INTEGER AS order_date,
(random() * 1000 + 10)::DECIMAL(10,2) AS total_amount,
CASE (random() * 4)::INTEGER
WHEN 0 THEN 'pending'
WHEN 1 THEN 'processing'
WHEN 2 THEN 'shipped'
WHEN 3 THEN 'delivered'
ELSE 'cancelled'
END AS status,
'Auto-generated order #' || generate_series AS notes
FROM generate_series(1, 10000);
-- Проверка количества
SELECT COUNT(*) as total_orders FROM public.orders_large;
"
Ожидаемый результат: total_orders = 10000
Шаг 3: Развертывание коннектора
Создайте коннектор с поддержкой incremental snapshots.
Важно: Используем
snapshot.mode=never, чтобы не делать initial snapshot автоматически. Вместо этого мы запустим incremental snapshot вручную.
# Удалите старый коннектор, если есть
curl -X DELETE http://localhost:8083/connectors/orders-incremental 2>/dev/null
# Создание нового коннектора
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "orders-incremental",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"topic.prefix": "inventory",
"table.include.list": "public.orders_large,public.debezium_signal",
"plugin.name": "pgoutput",
"slot.name": "orders_incremental_slot",
"publication.name": "orders_incremental_pub",
"snapshot.mode": "never",
"signal.data.collection": "public.debezium_signal",
"signal.enabled.channels": "source",
"incremental.snapshot.chunk.size": "512"
}
}'
Объяснение конфигурации
| Параметр | Значение | Назначение |
|---|---|---|
snapshot.mode | never | Не делать initial snapshot |
signal.data.collection | public.debezium_signal | Таблица для приема сигналов |
signal.enabled.channels | source | Включить source-based signaling |
incremental.snapshot.chunk.size | 512 | 512 строк на chunk (малый размер для демонстрации) |
Шаг 4: Проверка коннектора
curl -s http://localhost:8083/connectors/orders-incremental/status | jq .
Ожидаемый статус:
{
"name": "orders-incremental",
"connector": {
"state": "RUNNING"
},
"tasks": [
{
"id": 0,
"state": "RUNNING"
}
]
}
Шаг 5: Python consumer для мониторинга
Откройте JupyterLab: http://localhost:8888
Создайте новый notebook и скопируйте код:
from confluent_kafka import Consumer
import json
from datetime import datetime
# Конфигурация consumer
config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'snapshot-monitor-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
}
consumer = Consumer(config)
consumer.subscribe(['inventory.public.orders_large'])
# Счетчики
snapshot_count = 0
streaming_count = 0
last_chunk_time = None
chunk_boundary_count = 0
print("=" * 70)
print("Incremental Snapshot Monitor")
print("=" * 70)
print(f"Топик: inventory.public.orders_large")
print(f"Время старта: {datetime.now().strftime('%H:%M:%S')}")
print("=" * 70)
print()
print("Ожидание событий...")
print("После запуска snapshot вы увидите поток 'op=r' событий")
print()
def parse_cdc_event(msg_value):
"""Парсинг CDC события с определением типа."""
try:
event = json.loads(msg_value.decode('utf-8'))
payload = event.get('payload', {})
op = payload.get('op', '?')
source = payload.get('source', {})
is_snapshot = source.get('snapshot') in ('true', 'incremental')
after = payload.get('after', {})
before = payload.get('before', {})
data = after or before
return {
'op': op,
'is_snapshot': is_snapshot,
'data': data,
'source': source
}
except (json.JSONDecodeError, UnicodeDecodeError):
return None
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"[ERROR] {msg.error()}")
continue
event = parse_cdc_event(msg.value())
if event is None:
continue
op = event['op']
is_snapshot = event['is_snapshot']
data = event['data']
# Определение типа операции
op_names = {
'r': 'SNAPSHOT READ',
'c': 'CREATE',
'u': 'UPDATE',
'd': 'DELETE'
}
op_name = op_names.get(op, f'UNKNOWN({op})')
if is_snapshot and op == 'r':
snapshot_count += 1
# Показываем каждую 100-ю запись для наглядности
if snapshot_count % 100 == 0 or snapshot_count <= 5:
order_id = data.get('id', '?')
total = data.get('total_amount', 0)
print(f"[{op_name}] #{snapshot_count:>5} | order_id={order_id} | total={total}")
# Отслеживание chunk boundaries (каждые 512 записей)
if snapshot_count % 512 == 0:
chunk_boundary_count += 1
now = datetime.now().strftime('%H:%M:%S')
print(f"\n>>> Chunk #{chunk_boundary_count} завершен ({snapshot_count} записей) в {now}\n")
else:
streaming_count += 1
order_id = data.get('id', '?')
print(f"[{op_name}] Streaming event | order_id={order_id}")
except KeyboardInterrupt:
print("\n")
print("=" * 70)
print("Итоги мониторинга")
print("=" * 70)
print(f"Snapshot событий (op='r'): {snapshot_count}")
print(f"Streaming событий (op='c/u/d'): {streaming_count}")
print(f"Chunks обработано: {chunk_boundary_count}")
print(f"Время завершения: {datetime.now().strftime('%H:%M:%S')}")
print("=" * 70)
finally:
consumer.close()
print("Consumer закрыт.")
Запустите ячейку (Shift+Enter). Consumer будет ожидать события.
Шаг 6: Запуск incremental snapshot
Теперь запустим snapshot через INSERT в signaling table.
В отдельном терминале выполните:
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO public.debezium_signal (id, type, data)
VALUES (
'snapshot-orders-' || to_char(now(), 'YYYYMMDD-HH24MISS'),
'execute-snapshot',
'{\"data-collections\": [\"public.orders_large\"], \"type\": \"incremental\"}'
);
"
Что происходит после INSERT
- Debezium читает сигнал из
debezium_signal - Запускает incremental snapshot для
orders_large - Читает таблицу порциями по 512 строк (chunk size)
- Публикует события с
op: 'r'в Kafka - Параллельно продолжает обрабатывать streaming события
В JupyterLab вы увидите:
[SNAPSHOT READ] # 1 | order_id=1 | total=547.23
[SNAPSHOT READ] # 2 | order_id=2 | total=123.45
[SNAPSHOT READ] # 3 | order_id=3 | total=890.12
[SNAPSHOT READ] # 4 | order_id=4 | total=234.56
[SNAPSHOT READ] # 5 | order_id=5 | total=678.90
[SNAPSHOT READ] # 100 | order_id=100 | total=345.67
[SNAPSHOT READ] # 200 | order_id=200 | total=456.78
>>> Chunk #1 завершен (512 записей) в 01:23:45
[SNAPSHOT READ] # 600 | order_id=600 | total=567.89
...
>>> Chunk #19 завершен (9728 записей) в 01:24:15
[SNAPSHOT READ] #10000 | order_id=10000 | total=789.01
>>> Chunk #20 завершен (10000 записей) в 01:24:20
Шаг 7: Остановка snapshot (демонстрация)
Для больших таблиц может понадобиться остановить snapshot, например, перед maintenance window.
Запустите новый snapshot:
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO public.debezium_signal (id, type, data)
VALUES (
'snapshot-test-stop',
'execute-snapshot',
'{\"data-collections\": [\"public.orders_large\"], \"type\": \"incremental\"}'
);
"
Немедленно остановите его:
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO public.debezium_signal (id, type, data)
VALUES (
'stop-snapshot-test',
'stop-snapshot',
'{\"data-collections\": [\"public.orders_large\"], \"type\": \"incremental\"}'
);
"
В логах коннектора вы увидите сообщение об остановке:
docker compose logs connect | grep -i "snapshot" | tail -5
Шаг 8: Filtered snapshot (опционально)
Incremental snapshot поддерживает фильтрацию — можно заснэпшотить только часть данных.
Snapshot только заказов 2025 года:
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO public.debezium_signal (id, type, data)
VALUES (
'snapshot-2025-orders',
'execute-snapshot',
'{
\"data-collections\": [\"public.orders_large\"],
\"type\": \"incremental\",
\"additional-conditions\": [
{
\"data-collection\": \"public.orders_large\",
\"filter\": \"order_date >= ''2025-01-01''\"
}
]
}'
);
"
Применение:
- Пересинхронизация данных за определенный период
- Восстановление после частичной потери данных
- Тестирование на подмножестве данных
Проверка знанийПосле частичной потери данных за последние 24 часа — как пересинхронизировать только затронутые данные без полного snapshot таблицы?
Шаг 9: Мониторинг в PostgreSQL
Пока snapshot выполняется, можно отслеживать активность:
Проверка replication slot
docker compose exec postgres psql -U postgres -c "
SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots
WHERE slot_type = 'logical';
"
Просмотр сигналов
docker compose exec postgres psql -U postgres -d inventory -c "
SELECT id, type, LEFT(data, 50) AS data_preview, pg_xact_commit_timestamp(xmin) AS sent_at
FROM public.debezium_signal
ORDER BY id DESC
LIMIT 5;
"
Очистка после лабораторной работы
# Удаление коннектора
curl -X DELETE http://localhost:8083/connectors/orders-incremental
# Удаление replication slot
docker compose exec postgres psql -U postgres -c "
SELECT pg_drop_replication_slot('orders_incremental_slot');
" 2>/dev/null
# Удаление тестовых таблиц (опционально)
docker compose exec postgres psql -U postgres -d inventory -c "
DROP TABLE IF EXISTS public.orders_large;
DROP TABLE IF EXISTS public.debezium_signal;
"
Типичные проблемы
Snapshot не запускается
Проверьте:
- Коннектор в состоянии RUNNING
- Signaling table указана в
table.include.list signal.data.collectionсовпадает с именем таблицы
curl -s http://localhost:8083/connectors/orders-incremental/config | jq '.["signal.data.collection"]'
Событий нет в consumer
Проверьте:
- Топик создан:
docker compose exec kafka kafka-topics --bootstrap-server localhost:9092 --list | grep orders - Consumer подписан на правильный топик
auto.offset.reset: earliestдля чтения с начала
Chunk слишком большой / маленький
Измените размер и пересоздайте коннектор:
"incremental.snapshot.chunk.size": "2048"
Что мы узнали
- Signaling table — механизм управления snapshot’ами через SQL
- execute-snapshot запускает incremental snapshot для указанных таблиц
- stop-snapshot позволяет прервать snapshot в любой момент
- Python consumer может различать snapshot (
op='r') и streaming события - Chunk boundaries видны по паттерну обработки (каждые N записей)
- Filtered snapshots позволяют синхронизировать подмножество данных
Итоги лабораторной работы
Вы успешно:
- Настроили signaling table для управления snapshots
- Запустили incremental snapshot для 10,000 записей
- Мониторили процесс через Python consumer
- Продемонстрировали stop/resume возможности
- Попробовали filtered snapshot
Эти навыки критически важны для работы с большими таблицами в production, где downtime недопустим.
Что дальше?
В следующем модуле мы рассмотрим продвинутые темы: Schema Evolution и работу с Avro/Schema Registry для типобезопасной передачи данных.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс