Требуемые знания:
- module-1/03-lab-setup
Первый Debezium коннектор
В этом уроке мы развернем PostgreSQL коннектор и увидим, как изменения в базе данных превращаются в события Kafka в реальном времени.
Подготовка: проверка окружения
Убедитесь, что лабораторное окружение запущено:
cd labs/
docker compose ps
Все сервисы должны быть в статусе healthy или running.
Шаг 1: Создание тестовых данных
Создадим таблицу customers и добавим начальные записи:
docker compose exec postgres psql -U postgres -d inventory -c "
CREATE TABLE IF NOT EXISTS customers (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO customers (name, email) VALUES
('Иван Петров', '[email protected]'),
('Мария Сидорова', '[email protected]')
ON CONFLICT (email) DO NOTHING;
"
Проверьте, что данные созданы:
docker compose exec postgres psql -U postgres -d inventory -c "SELECT * FROM customers;"
Шаг 2: Проверка готовности Kafka Connect
Убедитесь, что Kafka Connect API доступен:
curl -s http://localhost:8083/connectors | jq .
Ожидаемый ответ — пустой массив (коннекторы еще не созданы):
[]
Проверьте, что Debezium PostgreSQL коннектор доступен:
curl -s http://localhost:8083/connector-plugins | jq '.[].class' | grep -i postgres
Должны увидеть: "io.debezium.connector.postgresql.PostgresConnector"
Шаг 3: Развертывание коннектора
Создайте коннектор через REST API:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"topic.prefix": "inventory",
"table.include.list": "public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_customers",
"publication.name": "dbz_publication"
}
}'
Важно:
database.port— это внутренний Docker порт5432, а не внешний5433. Коннектор работает внутри Docker сети и обращается к PostgreSQL по его внутреннему адресу.
Объяснение конфигурации
| Параметр | Значение | Назначение |
|---|---|---|
connector.class | ...PostgresConnector | Класс Debezium коннектора для PostgreSQL |
database.hostname | postgres | Docker service name (внутренняя сеть) |
database.port | 5432 | Внутренний порт PostgreSQL (НЕ 5433!) |
database.dbname | inventory | Имя базы данных |
topic.prefix | inventory | Префикс Kafka топиков |
table.include.list | public.customers | Какие таблицы отслеживать |
plugin.name | pgoutput | Встроенный плагин logical decoding |
slot.name | debezium_customers | Уникальное имя replication slot |
publication.name | dbz_publication | Имя PostgreSQL publication |
Формирование имени топика
Debezium создает топики по шаблону: {topic.prefix}.{schema}.{table}
Для нашей конфигурации:
- Prefix:
inventory - Schema:
public - Table:
customers
Результат: топик inventory.public.customers
Проверка знанийЕсли в конфигурации коннектора указан topic.prefix=myapp и отслеживается таблица public.users, какое имя получит Kafka-топик?
Шаг 4: Проверка статуса коннектора
Проверьте, что коннектор запустился:
curl -s http://localhost:8083/connectors/inventory-connector/status | jq .
Ожидаемый ответ:
{
"name": "inventory-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect:8083"
}
],
"type": "source"
}
Если state равен FAILED, проверьте логи:
docker compose logs connect | tail -50
Шаг 5: Проверка Kafka топиков
Коннектор должен был создать топик и отправить snapshot существующих данных:
docker compose exec kafka kafka-topics --bootstrap-server localhost:9092 --list
Ожидаемые топики:
connect_configs
connect_offsets
connect_statuses
inventory.public.customers # <-- наш топик с данными
Шаг 6: Чтение snapshot событий
При первом запуске Debezium делает snapshot — читает все существующие записи и отправляет их в Kafka. Прочитаем эти события:
docker compose exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic inventory.public.customers \
--from-beginning \
--max-messages 2
Вы увидите JSON-события для каждой записи. Обратите внимание на поле "op": "r" — это означает read (snapshot), а не create.
Проверка знанийПочему события начального снэпшота имеют op: "r" (read), а не op: "c" (create)?
Шаг 7: Тестирование live capture
Теперь проверим захват изменений в реальном времени. Откройте два терминала:
Терминал 1: Consumer (слушает события)
docker compose exec kafka kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic inventory.public.customers
Терминал 2: Вставка данных
docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO customers (name, email) VALUES ('Алексей Козлов', '[email protected]');
"
В терминале с consumer вы увидите новое событие с "op": "c" (create).
Тестирование UPDATE
docker compose exec postgres psql -U postgres -d inventory -c "
UPDATE customers SET name = 'Алексей Александрович Козлов' WHERE email = '[email protected]';
"
Событие будет содержать "op": "u" и поля before (старые данные) и after (новые данные).
Тестирование DELETE
docker compose exec postgres psql -U postgres -d inventory -c "
DELETE FROM customers WHERE email = '[email protected]';
"
Событие будет содержать "op": "d" и только поле before.
Поток данных
Типичные ошибки
Ошибка подключения к базе данных
Connection refused: postgres:5432
Причина: Используется неверный hostname или порт.
Решение: Внутри Docker используйте postgres:5432, не localhost:5433.
Коннектор с таким именем уже существует
Connector inventory-connector already exists
Решение: Удалите существующий коннектор:
curl -X DELETE http://localhost:8083/connectors/inventory-connector
Replication slot уже используется
replication slot "debezium_customers" is already active
Причина: Предыдущий коннектор не был корректно остановлен. Решение: Удалите slot вручную:
docker compose exec postgres psql -U postgres -c "SELECT pg_drop_replication_slot('debezium_customers');"
Управление коннектором
Список всех коннекторов
curl -s http://localhost:8083/connectors | jq .
Приостановка коннектора
curl -X PUT http://localhost:8083/connectors/inventory-connector/pause
Возобновление коннектора
curl -X PUT http://localhost:8083/connectors/inventory-connector/resume
Удаление коннектора
curl -X DELETE http://localhost:8083/connectors/inventory-connector
Проверка replication slot
Посмотрите, какие replication slots созданы в PostgreSQL:
docker compose exec postgres psql -U postgres -c "
SELECT slot_name, active, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
"
Вы увидите slot debezium_customers в состоянии active = true.
Важно: Если коннектор остановлен надолго, slot продолжает накапливать WAL-сегменты. Следите за размером lag и удаляйте неиспользуемые slots.
Что мы узнали
- Коннектор развертывается через REST API — простой POST-запрос с JSON-конфигурацией
- Внутренние порты Docker — внутри сети используется
postgres:5432, неlocalhost:5433 - Snapshot при старте — Debezium сначала читает все существующие данные
- Типы операций:
r(read/snapshot),c(create),u(update),d(delete) - Replication slots — PostgreSQL механизм, гарантирующий, что изменения не будут потеряны
Что дальше?
В следующем уроке мы напишем Python-потребитель для обработки CDC-событий и научимся извлекать полезные данные из envelope-структуры Debezium.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс