Перейти к содержанию
Learning Platform
Начальный
25 минут
Debezium PostgreSQL Kafka Connect REST API

Требуемые знания:

  • module-1/03-lab-setup

Первый Debezium коннектор

В этом уроке мы развернем PostgreSQL коннектор и увидим, как изменения в базе данных превращаются в события Kafka в реальном времени.

Подготовка: проверка окружения

Убедитесь, что лабораторное окружение запущено:

cd labs/
docker compose ps

Все сервисы должны быть в статусе healthy или running.

Шаг 1: Создание тестовых данных

Создадим таблицу customers и добавим начальные записи:

docker compose exec postgres psql -U postgres -d inventory -c "
CREATE TABLE IF NOT EXISTS customers (
  id SERIAL PRIMARY KEY,
  name VARCHAR(100) NOT NULL,
  email VARCHAR(100) UNIQUE NOT NULL,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO customers (name, email) VALUES
  ('Иван Петров', '[email protected]'),
  ('Мария Сидорова', '[email protected]')
ON CONFLICT (email) DO NOTHING;
"

Проверьте, что данные созданы:

docker compose exec postgres psql -U postgres -d inventory -c "SELECT * FROM customers;"

Шаг 2: Проверка готовности Kafka Connect

Убедитесь, что Kafka Connect API доступен:

curl -s http://localhost:8083/connectors | jq .

Ожидаемый ответ — пустой массив (коннекторы еще не созданы):

[]

Проверьте, что Debezium PostgreSQL коннектор доступен:

curl -s http://localhost:8083/connector-plugins | jq '.[].class' | grep -i postgres

Должны увидеть: "io.debezium.connector.postgresql.PostgresConnector"

Шаг 3: Развертывание коннектора

Создайте коннектор через REST API:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "inventory-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "inventory",
      "topic.prefix": "inventory",
      "table.include.list": "public.customers",
      "plugin.name": "pgoutput",
      "slot.name": "debezium_customers",
      "publication.name": "dbz_publication"
    }
  }'

Важно: database.port — это внутренний Docker порт 5432, а не внешний 5433. Коннектор работает внутри Docker сети и обращается к PostgreSQL по его внутреннему адресу.

Объяснение конфигурации

ПараметрЗначениеНазначение
connector.class...PostgresConnectorКласс Debezium коннектора для PostgreSQL
database.hostnamepostgresDocker service name (внутренняя сеть)
database.port5432Внутренний порт PostgreSQL (НЕ 5433!)
database.dbnameinventoryИмя базы данных
topic.prefixinventoryПрефикс Kafka топиков
table.include.listpublic.customersКакие таблицы отслеживать
plugin.namepgoutputВстроенный плагин logical decoding
slot.namedebezium_customersУникальное имя replication slot
publication.namedbz_publicationИмя PostgreSQL publication

Формирование имени топика

Debezium создает топики по шаблону: {topic.prefix}.{schema}.{table}

Для нашей конфигурации:

  • Prefix: inventory
  • Schema: public
  • Table: customers

Результат: топик inventory.public.customers

Проверка знаний
Если в конфигурации коннектора указан topic.prefix=myapp и отслеживается таблица public.users, какое имя получит Kafka-топик?
Ответ
Топик получит имя myapp.public.users. Debezium формирует имя по шаблону {topic.prefix}.{schema}.{table}. Эта трехкомпонентная схема предотвращает коллизии имен, если одинаковые таблицы существуют в разных схемах базы данных.

Шаг 4: Проверка статуса коннектора

Проверьте, что коннектор запустился:

curl -s http://localhost:8083/connectors/inventory-connector/status | jq .

Ожидаемый ответ:

{
  "name": "inventory-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connect:8083"
    }
  ],
  "type": "source"
}

Если state равен FAILED, проверьте логи:

docker compose logs connect | tail -50

Шаг 5: Проверка Kafka топиков

Коннектор должен был создать топик и отправить snapshot существующих данных:

docker compose exec kafka kafka-topics --bootstrap-server localhost:9092 --list

Ожидаемые топики:

connect_configs
connect_offsets
connect_statuses
inventory.public.customers    # <-- наш топик с данными

Шаг 6: Чтение snapshot событий

При первом запуске Debezium делает snapshot — читает все существующие записи и отправляет их в Kafka. Прочитаем эти события:

docker compose exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic inventory.public.customers \
  --from-beginning \
  --max-messages 2

Вы увидите JSON-события для каждой записи. Обратите внимание на поле "op": "r" — это означает read (snapshot), а не create.

Проверка знаний
Почему события начального снэпшота имеют op: "r" (read), а не op: "c" (create)?
Ответ
Операция "r" (read) означает, что Debezium читает уже существующие данные из таблицы, а не захватывает новый INSERT. Операция "c" (create) зарезервирована для live-событий, когда приложение выполняет реальный INSERT, и эта транзакция фиксируется в WAL. Различие важно для data pipeline: снэпшот-события можно обработать как bulk load, а live-события -- как инкрементальные обновления.

Шаг 7: Тестирование live capture

Теперь проверим захват изменений в реальном времени. Откройте два терминала:

Терминал 1: Consumer (слушает события)

docker compose exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic inventory.public.customers

Терминал 2: Вставка данных

docker compose exec postgres psql -U postgres -d inventory -c "
INSERT INTO customers (name, email) VALUES ('Алексей Козлов', '[email protected]');
"

В терминале с consumer вы увидите новое событие с "op": "c" (create).

Тестирование UPDATE

docker compose exec postgres psql -U postgres -d inventory -c "
UPDATE customers SET name = 'Алексей Александрович Козлов' WHERE email = '[email protected]';
"

Событие будет содержать "op": "u" и поля before (старые данные) и after (новые данные).

Тестирование DELETE

docker compose exec postgres psql -U postgres -d inventory -c "
DELETE FROM customers WHERE email = '[email protected]';
"

Событие будет содержать "op": "d" и только поле before.

Поток данных

psql
PostgreSQL
WAL
Debezium
Kafka
INSERT INTO customersWrite to transaction logOKRead via logical replicationChange eventPublish to inventory.public.customers

Типичные ошибки

Ошибка подключения к базе данных

Connection refused: postgres:5432

Причина: Используется неверный hostname или порт. Решение: Внутри Docker используйте postgres:5432, не localhost:5433.

Коннектор с таким именем уже существует

Connector inventory-connector already exists

Решение: Удалите существующий коннектор:

curl -X DELETE http://localhost:8083/connectors/inventory-connector

Replication slot уже используется

replication slot "debezium_customers" is already active

Причина: Предыдущий коннектор не был корректно остановлен. Решение: Удалите slot вручную:

docker compose exec postgres psql -U postgres -c "SELECT pg_drop_replication_slot('debezium_customers');"

Управление коннектором

Список всех коннекторов

curl -s http://localhost:8083/connectors | jq .

Приостановка коннектора

curl -X PUT http://localhost:8083/connectors/inventory-connector/pause

Возобновление коннектора

curl -X PUT http://localhost:8083/connectors/inventory-connector/resume

Удаление коннектора

curl -X DELETE http://localhost:8083/connectors/inventory-connector

Проверка replication slot

Посмотрите, какие replication slots созданы в PostgreSQL:

docker compose exec postgres psql -U postgres -c "
SELECT slot_name, active, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots;
"

Вы увидите slot debezium_customers в состоянии active = true.

Важно: Если коннектор остановлен надолго, slot продолжает накапливать WAL-сегменты. Следите за размером lag и удаляйте неиспользуемые slots.

Что мы узнали

  1. Коннектор развертывается через REST API — простой POST-запрос с JSON-конфигурацией
  2. Внутренние порты Docker — внутри сети используется postgres:5432, не localhost:5433
  3. Snapshot при старте — Debezium сначала читает все существующие данные
  4. Типы операций: r (read/snapshot), c (create), u (update), d (delete)
  5. Replication slots — PostgreSQL механизм, гарантирующий, что изменения не будут потеряны

Что дальше?

В следующем уроке мы напишем Python-потребитель для обработки CDC-событий и научимся извлекать полезные данные из envelope-структуры Debezium.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что происходит при первом запуске Debezium коннектора с настройкой snapshot.mode=initial?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс