Архитектура Kafka Connect
Писать продюсеры и консьюмеры вручную — правильный подход, когда вы контролируете обе стороны потока данных. Но что делать, когда нужно доставить данные из PostgreSQL в Kafka, или из Kafka в Elasticsearch? Можно написать продюсер, который читает из базы и публикует в Kafka. Но такой код нужно поддерживать: обрабатывать offset, управлять повторными попытками, следить за отказами.
Kafka Connect — это фреймворк, решающий именно эту задачу. Он предоставляет стандартизированную модель для перемещения данных между Kafka и внешними системами без написания специализированного кода.
Что такое Kafka Connect
Kafka Connect — это JVM-процесс (worker), который запускает коннекторы (connectors). Коннектор — это плагин, реализующий стандартный интерфейс для работы с конкретной системой: PostgreSQL, MySQL, Elasticsearch, S3, Salesforce. Confluent Hub содержит сотни готовых коннекторов.
Ключевые абстракции Connect:
- Connector — описывает задачу (откуда/куда читать, конфигурация). Не выполняет работу сам.
- Task — единица выполнения. Connector порождает N задач; каждая Task выполняет часть работы.
- Worker — JVM-процесс, который исполняет Task-объекты.
- Converter — сериализатор/десериализатор. Определяет формат данных в Kafka (JSON, Avro, Protobuf).
Standalone режим: разработка и тестирование
Standalone — простейший способ запустить Connect. Один worker-процесс, все коннекторы описаны в файлах конфигурации.
# Запуск Connect в standalone режиме
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/my-file-source.properties
Конфигурация worker (connect-standalone.properties):
bootstrap.servers=localhost:9092
# Хранение offset в локальном файле (не в Kafka)
offset.storage.file.filename=/tmp/connect.offsets
# Конвертеры по умолчанию
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
Ограничения standalone:
- Offset хранится локально (файл) — при потере файла позиция чтения теряется.
- Нет автоматического перебалансирования задач.
- Нет REST API для управления коннекторами.
- Один процесс — нет горизонтального масштабирования.
Когда использовать: разработка, тестирование коннектора, учебные примеры. Не для production.
Distributed режим: production-готовый Connect
В distributed режиме несколько worker-процессов объединяются в кластер через общий group.id. Connect использует тот же механизм координации, что и consumer group — group.id определяет, какие воркеры работают вместе.
# connect-distributed.properties
bootstrap.servers=localhost:9092
# Все worker с одинаковым group.id образуют один Connect-кластер
group.id=connect-cluster
# Внутренние Kafka-топики для хранения состояния
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
# Replication factor внутренних топиков (3 для production)
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# Конвертеры по умолчанию
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# REST API
listeners=HTTP://0.0.0.0:8083
rest.advertised.host.name=worker1.example.com
rest.advertised.port=8083
Запуск distributed worker:
bin/connect-distributed.sh config/connect-distributed.properties
Kafka Connect REST API работает на порту 8083 — это основной интерфейс управления в distributed режиме. Все операции (создание, пауза, удаление коннекторов) выполняются через HTTP API, а не через конфигурационные файлы.
Внутренние топики: как Connect хранит состояние
В distributed режиме Connect использует три внутренних Kafka-топика:
| Топик | Назначение | Cleanup Policy |
|---|---|---|
connect-configs | Конфигурации коннекторов и задач | compact |
connect-offsets | Позиции чтения source-коннекторов | compact |
connect-status | Статусы коннекторов и задач | compact |
Worker 1
Worker 1: JVM-процесс Connect. Выполняет назначенные задачи коннекторов. Участвует в перебалансировке при изменении состава кластера. Слушает REST API на порту 8083.Worker 2
Worker 2: участник того же Connect-кластера (одинаковый group.id). Если Worker 1 падает, его задачи перераспределяются на Worker 2 и Worker 3.Worker 3
Worker 3: третий участник кластера. Чем больше воркеров — тем больше задач может исполняться параллельно, и тем выше отказоустойчивость.Connector vs Task: разделение ответственности
Разграничение между Connector и Task — ключевое для понимания параллелизма Connect.
Connector отвечает за:
- Валидацию конфигурации
- Определение списка задач и их параметров (
taskConfigs()) - Мониторинг источника данных
Task отвечает за:
- Фактическое чтение/запись данных
- Source task: метод
poll()возвращает списокSourceRecord - Sink task: метод
put()принимает пакетSinkRecordи записывает во внешнюю систему
Параметр tasks.max определяет максимальное число задач, которые Connect создаст для одного коннектора. Реальное число задач может быть меньше — ограничено числом партиций источника.
tasks.max=4 → Connect создаст до 4 задач
Если источник поддерживает только 2 потока — создастся 2 задачи
Задачи распределяются по воркерам кластера автоматически
При падении воркера его задачи перераспределяются на оставшихся участников кластера. Это ключевое отличие distributed режима от standalone.
Converters: формат данных в Kafka
Converter определяет, как записи сериализуются при записи в Kafka (source) и десериализуются при чтении из Kafka (sink).
Базовые конвертеры:
| Converter Class | Описание |
|---|---|
org.apache.kafka.connect.json.JsonConverter | JSON с опциональной встроенной схемой |
org.apache.kafka.connect.storage.StringConverter | Строка без схемы |
org.apache.kafka.connect.converters.ByteArrayConverter | Сырые байты |
io.confluent.connect.avro.AvroConverter | Avro с Schema Registry |
io.confluent.connect.protobuf.ProtobufConverter | Protobuf с Schema Registry |
Конвертеры задаются на уровне воркера (по умолчанию) или переопределяются на уровне коннектора:
# Конфигурация конкретного коннектора
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
AvroConverter требует работающего Schema Registry. При сериализации конвертер регистрирует схему в Schema Registry и добавляет 5-байтовый заголовок (magic byte + schema ID) перед Avro-payload. Детально Schema Registry рассматривается в Модуле 06.
REST API: управление коннекторами
В distributed режиме все операции с коннекторами выполняются через REST API на порту 8083.
Основные эндпоинты:
| Метод | Путь | Действие |
|---|---|---|
POST | /connectors | Создать коннектор |
GET | /connectors | Список всех коннекторов |
GET | /connectors/{name} | Конфигурация коннектора |
GET | /connectors/{name}/status | Статус коннектора и задач |
PUT | /connectors/{name}/config | Обновить конфигурацию |
POST | /connectors/{name}/restart | Перезапустить коннектор |
POST | /connectors/{name}/pause | Приостановить |
POST | /connectors/{name}/resume | Возобновить |
DELETE | /connectors/{name} | Удалить коннектор |
GET | /connectors/{name}/tasks/{id}/status | Статус конкретной задачи |
POST | /connectors/{name}/tasks/{id}/restart | Перезапустить задачу |
Создание коннектора:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "my-jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://db:5432/mydb",
"connection.user": "kafka",
"connection.password": "secret",
"table.whitelist": "orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "db-",
"tasks.max": "2"
}
}'
Проверка статуса:
curl http://localhost:8083/connectors/my-jdbc-source/status
# {
# "name": "my-jdbc-source",
# "connector": { "state": "RUNNING", "worker_id": "worker1:8083" },
# "tasks": [
# { "id": 0, "state": "RUNNING", "worker_id": "worker1:8083" },
# { "id": 1, "state": "RUNNING", "worker_id": "worker2:8083" }
# ]
# }
Standalone vs Distributed: сравнение
| Свойство | Standalone | Distributed |
|---|---|---|
| Число воркеров | 1 | Несколько (один group.id) |
| Хранение состояния | Локальный файл | Kafka-топики |
| Перебалансировка задач | Нет | Да (при падении воркера) |
| REST API | Ограниченный | Полный (8083) |
| Конфигурация коннекторов | Файлы .properties | REST API |
| Production-использование | Нет | Да |
Ключевые выводы
- Kafka Connect — фреймворк для интеграции без написания producer/consumer кода.
- Standalone — для разработки и тестирования. Distributed — для production.
- В distributed режиме несколько воркеров с одинаковым
group.idобразуют кластер и автоматически перераспределяют задачи при отказах. - Внутренние топики (connect-configs, connect-offsets, connect-status) — механизм персистентности состояния Connect.
- Converter определяет формат сериализации. AvroConverter требует Schema Registry (Module 06).
- REST API на порту 8083 — единственный способ управления коннекторами в distributed режиме.