Перейти к содержанию
Learning Platform
Продвинутый
45 минут
incremental-snapshot signaling-table hands-on-lab python-consumer

Требуемые знания:

  • module-2/06-snapshot-strategies

Практика: Incremental Snapshot

В этой лабораторной работе мы настроим и выполним incremental snapshot для большой таблицы, используя signaling table для управления процессом и Python consumer для мониторинга.

Цели лабораторной работы

  • Создать signaling table для управления incremental snapshots
  • Сгенерировать тестовую таблицу с 10,000+ записями
  • Развернуть коннектор с поддержкой incremental snapshots
  • Запустить snapshot через INSERT в signaling table
  • Мониторить процесс snapshot с помощью Python consumer
  • Продемонстрировать stop/resume возможности

Предварительные требования

Убедитесь, что лабораторное окружение из Module 1 запущено:

cd labs/
docker compose ps

Все сервисы должны быть в статусе healthy или running.

Архитектура лабораторной работы

PostgreSQL
debezium_signal
trigger
orders_large
10,000 rows
Debezium
orders-incremental
snapshot.mode=never
chunk.size=512
PostgreSQL
chunks
Connector
events
Kafka
consume
Python
Kafka
inventory.public.orders_large
op='r' (snapshot) + op='c/u/d' (streaming)
Python Monitor
Snapshot Monitor
Считает события, отслеживает chunks
Процесс запуска snapshot
1. INSERT signal
2. Connector reads
3. Chunks to Kafka
4. Monitor displays

Шаг 1: Создание signaling table

Signaling table — это специальная таблица, через которую можно отправлять команды коннектору.

docker compose exec postgres psql -U postgres -d inventory -c "
-- Создание signaling table
CREATE TABLE IF NOT EXISTS public.debezium_signal (
    id VARCHAR(100) PRIMARY KEY,
    type VARCHAR(50) NOT NULL,
    data TEXT NOT NULL
);

-- Права на таблицу
GRANT INSERT, UPDATE, DELETE ON public.debezium_signal TO postgres;

-- Проверка
\\d public.debezium_signal
"

Структура signaling table:

КолонкаТипНазначение
idVARCHAR(100)Уникальный идентификатор сигнала
typeVARCHAR(50)Тип сигнала: execute-snapshot, stop-snapshot
dataTEXTJSON с параметрами сигнала
Проверка знаний
Почему Debezium использует для управления snapshot таблицу в базе данных, а не REST API Kafka Connect?
Ответ
Signaling table читается через тот же CDC-пайплайн (WAL -> слот репликации -> pgoutput). INSERT в таблицу обрабатывается в точном порядке событий базы данных, обеспечивая запуск snapshot в конкретной, консистентной точке WAL-потока. REST API обрабатывался бы вне основного потока, усложняя координацию с позицией streaming.

Шаг 2: Создание тестовой таблицы

Создадим таблицу orders_large с 10,000 записями для демонстрации chunked снэпшота:

docker compose exec postgres psql -U postgres -d inventory -c "
-- Создание таблицы orders_large
CREATE TABLE IF NOT EXISTS public.orders_large (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER NOT NULL,
    order_date DATE NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',
    notes TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Генерация 10,000 записей
INSERT INTO public.orders_large (customer_id, order_date, total_amount, status, notes)
SELECT
    (random() * 1000)::INTEGER + 1 AS customer_id,
    '2024-01-01'::DATE + (random() * 365)::INTEGER AS order_date,
    (random() * 1000 + 10)::DECIMAL(10,2) AS total_amount,
    CASE (random() * 4)::INTEGER
        WHEN 0 THEN 'pending'
        WHEN 1 THEN 'processing'
        WHEN 2 THEN 'shipped'
        WHEN 3 THEN 'delivered'
        ELSE 'cancelled'
    END AS status,
    'Auto-generated order #' || generate_series AS notes
FROM generate_series(1, 10000);

-- Проверка количества
SELECT COUNT(*) as total_orders FROM public.orders_large;
"

Ожидаемый результат: total_orders = 10000

Шаг 3: Развертывание коннектора

Создайте коннектор с поддержкой incremental snapshots.

Важно: Используем snapshot.mode=never, чтобы не делать initial snapshot автоматически. Вместо этого мы запустим incremental snapshot вручную.

# Удалите старый коннектор, если есть
curl -X DELETE http://localhost:8083/connectors/orders-incremental 2>/dev/null

# Создание нового коннектора
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "orders-incremental",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "inventory",
      "topic.prefix": "inventory",
      "table.include.list": "public.orders_large,public.debezium_signal",
      "plugin.name": "pgoutput",
      "slot.name": "orders_incremental_slot",
      "publication.name": "orders_incremental_pub",

      "snapshot.mode": "never",
      "signal.data.collection": "public.debezium_signal",
      "signal.enabled.channels": "source",
      "incremental.snapshot.chunk.size": "512"
    }
  }'

Объяснение конфигурации

ПараметрЗначениеНазначение
snapshot.modeneverНе делать initial snapshot
signal.data.collectionpublic.debezium_signalТаблица для приема сигналов
signal.enabled.channelssourceВключить source-based signaling
incremental.snapshot.chunk.size512512 строк на chunk (малый размер для демонстрации)

Шаг 4: Проверка коннектора

curl -s http://localhost:8083/connectors/orders-incremental/status | jq .

Ожидаемый статус:

{
  "name": "orders-incremental",
  "connector": {
    "state": "RUNNING"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING"
    }
  ]
}

Шаг 5: Python consumer для мониторинга

Откройте JupyterLab: http://localhost:8888

Создайте новый notebook и скопируйте код:

from confluent_kafka import Consumer
import json
from datetime import datetime

# Конфигурация consumer
config = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'snapshot-monitor-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True
}

consumer = Consumer(config)
consumer.subscribe(['inventory.public.orders_large'])

# Счетчики
snapshot_count = 0
streaming_count = 0
last_chunk_time = None
chunk_boundary_count = 0

print("=" * 70)
print("Incremental Snapshot Monitor")
print("=" * 70)
print(f"Топик: inventory.public.orders_large")
print(f"Время старта: {datetime.now().strftime('%H:%M:%S')}")
print("=" * 70)
print()
print("Ожидание событий...")
print("После запуска snapshot вы увидите поток 'op=r' событий")
print()

def parse_cdc_event(msg_value):
    """Парсинг CDC события с определением типа."""
    try:
        event = json.loads(msg_value.decode('utf-8'))
        payload = event.get('payload', {})

        op = payload.get('op', '?')
        source = payload.get('source', {})
        is_snapshot = source.get('snapshot') in ('true', 'incremental')

        after = payload.get('after', {})
        before = payload.get('before', {})
        data = after or before

        return {
            'op': op,
            'is_snapshot': is_snapshot,
            'data': data,
            'source': source
        }
    except (json.JSONDecodeError, UnicodeDecodeError):
        return None

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            print(f"[ERROR] {msg.error()}")
            continue

        event = parse_cdc_event(msg.value())
        if event is None:
            continue

        op = event['op']
        is_snapshot = event['is_snapshot']
        data = event['data']

        # Определение типа операции
        op_names = {
            'r': 'SNAPSHOT READ',
            'c': 'CREATE',
            'u': 'UPDATE',
            'd': 'DELETE'
        }
        op_name = op_names.get(op, f'UNKNOWN({op})')

        if is_snapshot and op == 'r':
            snapshot_count += 1

            # Показываем каждую 100-ю запись для наглядности
            if snapshot_count % 100 == 0 or snapshot_count <= 5:
                order_id = data.get('id', '?')
                total = data.get('total_amount', 0)
                print(f"[{op_name}] #{snapshot_count:>5} | order_id={order_id} | total={total}")

            # Отслеживание chunk boundaries (каждые 512 записей)
            if snapshot_count % 512 == 0:
                chunk_boundary_count += 1
                now = datetime.now().strftime('%H:%M:%S')
                print(f"\n>>> Chunk #{chunk_boundary_count} завершен ({snapshot_count} записей) в {now}\n")

        else:
            streaming_count += 1
            order_id = data.get('id', '?')
            print(f"[{op_name}] Streaming event | order_id={order_id}")

except KeyboardInterrupt:
    print("\n")
    print("=" * 70)
    print("Итоги мониторинга")
    print("=" * 70)
    print(f"Snapshot событий (op='r'): {snapshot_count}")
    print(f"Streaming событий (op='c/u/d'): {streaming_count}")
    print(f"Chunks обработано: {chunk_boundary_count}")
    print(f"Время завершения: {datetime.now().strftime('%H:%M:%S')}")
    print("=" * 70)

finally:
    consumer.close()
    print("Consumer закрыт.")

Запустите ячейку (Shift+Enter). Consumer будет ожидать события.

Шаг 6: Запуск incremental snapshot

Теперь запустим snapshot через INSERT в signaling table.

В отдельном терминале выполните:

docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO public.debezium_signal (id, type, data)
VALUES (
    'snapshot-orders-' || to_char(now(), 'YYYYMMDD-HH24MISS'),
    'execute-snapshot',
    '{\"data-collections\": [\"public.orders_large\"], \"type\": \"incremental\"}'
);
"

Что происходит после INSERT

  1. Debezium читает сигнал из debezium_signal
  2. Запускает incremental snapshot для orders_large
  3. Читает таблицу порциями по 512 строк (chunk size)
  4. Публикует события с op: 'r' в Kafka
  5. Параллельно продолжает обрабатывать streaming события

В JupyterLab вы увидите:

[SNAPSHOT READ] #    1 | order_id=1 | total=547.23
[SNAPSHOT READ] #    2 | order_id=2 | total=123.45
[SNAPSHOT READ] #    3 | order_id=3 | total=890.12
[SNAPSHOT READ] #    4 | order_id=4 | total=234.56
[SNAPSHOT READ] #    5 | order_id=5 | total=678.90
[SNAPSHOT READ] #  100 | order_id=100 | total=345.67
[SNAPSHOT READ] #  200 | order_id=200 | total=456.78

>>> Chunk #1 завершен (512 записей) в 01:23:45

[SNAPSHOT READ] #  600 | order_id=600 | total=567.89
...

>>> Chunk #19 завершен (9728 записей) в 01:24:15

[SNAPSHOT READ] #10000 | order_id=10000 | total=789.01

>>> Chunk #20 завершен (10000 записей) в 01:24:20

Шаг 7: Остановка snapshot (демонстрация)

Для больших таблиц может понадобиться остановить snapshot, например, перед maintenance window.

Запустите новый snapshot:

docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO public.debezium_signal (id, type, data)
VALUES (
    'snapshot-test-stop',
    'execute-snapshot',
    '{\"data-collections\": [\"public.orders_large\"], \"type\": \"incremental\"}'
);
"

Немедленно остановите его:

docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO public.debezium_signal (id, type, data)
VALUES (
    'stop-snapshot-test',
    'stop-snapshot',
    '{\"data-collections\": [\"public.orders_large\"], \"type\": \"incremental\"}'
);
"

В логах коннектора вы увидите сообщение об остановке:

docker compose logs connect | grep -i "snapshot" | tail -5

Шаг 8: Filtered snapshot (опционально)

Incremental snapshot поддерживает фильтрацию — можно заснэпшотить только часть данных.

Snapshot только заказов 2025 года:

docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO public.debezium_signal (id, type, data)
VALUES (
    'snapshot-2025-orders',
    'execute-snapshot',
    '{
        \"data-collections\": [\"public.orders_large\"],
        \"type\": \"incremental\",
        \"additional-conditions\": [
            {
                \"data-collection\": \"public.orders_large\",
                \"filter\": \"order_date >= ''2025-01-01''\"
            }
        ]
    }'
);
"

Применение:

  • Пересинхронизация данных за определенный период
  • Восстановление после частичной потери данных
  • Тестирование на подмножестве данных
Проверка знаний
После частичной потери данных за последние 24 часа — как пересинхронизировать только затронутые данные без полного snapshot таблицы?
Ответ
Используйте filtered incremental snapshot с additional-conditions, указав временной фильтр (например, updated_at > NOW() - INTERVAL '24 hours'). Коннектор прочитает только подходящие строки, продолжая streaming в параллельном режиме. Полный snapshot таблицы не потребуется.

Шаг 9: Мониторинг в PostgreSQL

Пока snapshot выполняется, можно отслеживать активность:

Проверка replication slot

docker compose exec postgres psql -U postgres -c "
SELECT
    slot_name,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots
WHERE slot_type = 'logical';
"

Просмотр сигналов

docker compose exec postgres psql -U postgres -d inventory -c "
SELECT id, type, LEFT(data, 50) AS data_preview, pg_xact_commit_timestamp(xmin) AS sent_at
FROM public.debezium_signal
ORDER BY id DESC
LIMIT 5;
"

Очистка после лабораторной работы

# Удаление коннектора
curl -X DELETE http://localhost:8083/connectors/orders-incremental

# Удаление replication slot
docker compose exec postgres psql -U postgres -c "
SELECT pg_drop_replication_slot('orders_incremental_slot');
" 2>/dev/null

# Удаление тестовых таблиц (опционально)
docker compose exec postgres psql -U postgres -d inventory -c "
DROP TABLE IF EXISTS public.orders_large;
DROP TABLE IF EXISTS public.debezium_signal;
"

Типичные проблемы

Snapshot не запускается

Проверьте:

  1. Коннектор в состоянии RUNNING
  2. Signaling table указана в table.include.list
  3. signal.data.collection совпадает с именем таблицы
curl -s http://localhost:8083/connectors/orders-incremental/config | jq '.["signal.data.collection"]'

Событий нет в consumer

Проверьте:

  1. Топик создан: docker compose exec kafka kafka-topics --bootstrap-server localhost:9092 --list | grep orders
  2. Consumer подписан на правильный топик
  3. auto.offset.reset: earliest для чтения с начала

Chunk слишком большой / маленький

Измените размер и пересоздайте коннектор:

"incremental.snapshot.chunk.size": "2048"

Что мы узнали

  1. Signaling table — механизм управления snapshot’ами через SQL
  2. execute-snapshot запускает incremental snapshot для указанных таблиц
  3. stop-snapshot позволяет прервать snapshot в любой момент
  4. Python consumer может различать snapshot (op='r') и streaming события
  5. Chunk boundaries видны по паттерну обработки (каждые N записей)
  6. Filtered snapshots позволяют синхронизировать подмножество данных

Итоги лабораторной работы

Итоги лабораторной работыRecommended
1Signaling Table создана
2Orders Large: 10K записей
3Коннектор с incremental snapshot
4Python Monitor запущен
5INSERT execute-snapshot
6Наблюдение chunk-by-chunk
7Stop/Resume демонстрация
8Filtered snapshot
10,000
записей в snapshot
~20
chunks обработано
512
строк на chunk

Вы успешно:

  • Настроили signaling table для управления snapshots
  • Запустили incremental snapshot для 10,000 записей
  • Мониторили процесс через Python consumer
  • Продемонстрировали stop/resume возможности
  • Попробовали filtered snapshot

Эти навыки критически важны для работы с большими таблицами в production, где downtime недопустим.

Что дальше?

В следующем модуле мы рассмотрим продвинутые темы: Schema Evolution и работу с Avro/Schema Registry для типобезопасной передачи данных.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 2. Почему Debezium использует для управления incremental snapshot таблицу в базе данных (signaling table), а не REST API Kafka Connect?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс