Learning Platform
Глоссарий Troubleshooting
Урок 06.01 · 25 мин
Средний
Kafka ConnectWorkerTaskDistributed ModeStandalone ModeREST API

Архитектура Kafka Connect

Писать продюсеры и консьюмеры вручную — правильный подход, когда вы контролируете обе стороны потока данных. Но что делать, когда нужно доставить данные из PostgreSQL в Kafka, или из Kafka в Elasticsearch? Можно написать продюсер, который читает из базы и публикует в Kafka. Но такой код нужно поддерживать: обрабатывать offset, управлять повторными попытками, следить за отказами.

Kafka Connect — это фреймворк, решающий именно эту задачу. Он предоставляет стандартизированную модель для перемещения данных между Kafka и внешними системами без написания специализированного кода.


Что такое Kafka Connect

Kafka Connect — это JVM-процесс (worker), который запускает коннекторы (connectors). Коннектор — это плагин, реализующий стандартный интерфейс для работы с конкретной системой: PostgreSQL, MySQL, Elasticsearch, S3, Salesforce. Confluent Hub содержит сотни готовых коннекторов.

Ключевые абстракции Connect:

  • Connector — описывает задачу (откуда/куда читать, конфигурация). Не выполняет работу сам.
  • Task — единица выполнения. Connector порождает N задач; каждая Task выполняет часть работы.
  • Worker — JVM-процесс, который исполняет Task-объекты.
  • Converter — сериализатор/десериализатор. Определяет формат данных в Kafka (JSON, Avro, Protobuf).
Kafka Connect: Pipeline данных
DatabaseБаза данных (JDBC / Debezium): реляционная СУБД (PostgreSQL, MySQL, Oracle). Источник читается через JDBC Source Connector (polling SELECT) или Debezium CDC (binlog/WAL). Debezium — это набор Connect-плагинов для захвата изменений в реальном времени без нагрузки на источник.
Files / S3Файловая система / S3: FileStream Source Connector читает файл построчно. S3 Source Connector читает объекты из Amazon S3 (или MinIO). Полезно для загрузки исторических данных и интеграции с data lake.
REST APIREST API / HTTP Source: HTTP Source Connector делает периодические GET-запросы к внешнему API и публикует ответы в Kafka топик. Используется для интеграции с SaaS-системами (Salesforce, GitHub, Jira).
poll / CDC-события
Source ConnectorSource Connector (источник): плагин Connect, опрашивающий внешнюю систему и преобразующий данные в SourceRecord. Lifecycle: taskConfigs() задаёт параллелизм; poll() возвращает список записей; коннектор сам управляет offset-ами в топике connect-offsets. task.max задаёт максимальное число параллельных задач.
serialize
ConverterКонвертер (Converter): преобразует SourceRecord в байты для записи в Kafka. Настраивается через key.converter и value.converter. Варианты: JsonConverter (JSON с/без схемы), AvroConverter (бинарный Avro + Schema Registry), ProtobufConverter, StringConverter. AvroConverter добавляет 5-байтовый заголовок (magic byte 0x00 + 4-байтовый schema ID).
produce
Kafka ClusterKafka кластер: принимает записи от Source Connector через стандартный Kafka Producer API. Помимо топиков с данными, Connect использует три служебных топика: connect-configs (конфигурация коннекторов и задач), connect-offsets (offsets источников, для source connector), connect-status (статус коннекторов и задач — RUNNING/PAUSED/FAILED). Эти топики должны быть созданы с replication.factor >= 3 в production.
consume
Sink ConnectorSink Connector (приёмник): потребляет записи из Kafka через стандартный Consumer API. Lifecycle: poll() читает батч из топика; put(records) записывает в целевую систему; offset commit происходит только после успешного put() — семантика at-least-once. Sink использует __consumer_offsets для хранения смещений (в отличие от Source, который использует connect-offsets).
write
errors
DLQ TopicDead Letter Queue (DLQ): топик Kafka, куда направляются записи, обработка которых завершилась ошибкой. Настраивается через errors.tolerance=all (вместо errors.tolerance=none по умолчанию) и errors.deadletterqueue.topic.name. Опция errors.deadletterqueue.context.headers.enable=true добавляет заголовки с описанием ошибки для отладки. Записи из DLQ можно переобработать после исправления проблемы.
ElasticsearchElasticsearch Sink: записывает документы в индекс Elasticsearch. Confluent Elasticsearch Connector поддерживает UPSERT по ключу Kafka-сообщения. Используется для полнотекстового поиска и аналитических дашбордов.
Amazon S3Amazon S3 Sink: записывает файлы в S3 (или MinIO, GCS). Поддерживает форматы Avro, Parquet, JSON, CSV. Настройка партиционирования: по времени (TimeBasedPartitioner) или по полю (FieldPartitioner). flush.size задаёт число записей до записи нового файла.
JDBC / RDBMSJDBC Sink: записывает в реляционную СУБД через JDBC. Поддерживает режимы insert, upsert (INSERT OR IGNORE + UPDATE), delete. pk.mode задаёт источник первичного ключа: kafka (использует offset/partition/topic) или record_key (ключ Kafka-сообщения) или record_value (поле из значения).
Worker Node (group.id)Connect Worker (распределённый режим): JVM-процесс, запускающий коннекторы и задачи. Все workers с одинаковым group.id объединяются в кластер и автоматически перераспределяют задачи при добавлении или сбое worker-а. REST API на порту 8083 позволяет создавать, настраивать и мониторить коннекторы через HTTP без перезапуска worker-а.
REST APIPOST /connectorsConnect REST API (порт 8083): управление коннекторами через HTTP. POST /connectors — создать коннектор. GET /connectors/{name}/status — получить статус. PUT /connectors/{name}/config — обновить конфигурацию. POST /connectors/{name}/restart — перезапустить упавший коннектор. DELETE /connectors/{name} — удалить коннектор.
Taskstask.max=NПараллелизм задач: task.max определяет максимальное число параллельных задач (Tasks) для коннектора. Каждая задача — независимый поток обработки. Worker-кластер распределяет задачи между доступными workers через протокол на базе connect-configs топика. Фактическое число задач = min(task.max, возможности источника).

Standalone режим: разработка и тестирование

Standalone — простейший способ запустить Connect. Один worker-процесс, все коннекторы описаны в файлах конфигурации.

# Запуск Connect в standalone режиме
bin/connect-standalone.sh \
  config/connect-standalone.properties \
  config/my-file-source.properties

Конфигурация worker (connect-standalone.properties):

bootstrap.servers=localhost:9092

# Хранение offset в локальном файле (не в Kafka)
offset.storage.file.filename=/tmp/connect.offsets

# Конвертеры по умолчанию
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

Ограничения standalone:

  • Offset хранится локально (файл) — при потере файла позиция чтения теряется.
  • Нет автоматического перебалансирования задач.
  • Нет REST API для управления коннекторами.
  • Один процесс — нет горизонтального масштабирования.

Когда использовать: разработка, тестирование коннектора, учебные примеры. Не для production.


Distributed режим: production-готовый Connect

В distributed режиме несколько worker-процессов объединяются в кластер через общий group.id. Connect использует тот же механизм координации, что и consumer group — group.id определяет, какие воркеры работают вместе.

# connect-distributed.properties
bootstrap.servers=localhost:9092

# Все worker с одинаковым group.id образуют один Connect-кластер
group.id=connect-cluster

# Внутренние Kafka-топики для хранения состояния
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status

# Replication factor внутренних топиков (3 для production)
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

# Конвертеры по умолчанию
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# REST API
listeners=HTTP://0.0.0.0:8083
rest.advertised.host.name=worker1.example.com
rest.advertised.port=8083

Запуск distributed worker:

bin/connect-distributed.sh config/connect-distributed.properties
TIP

Kafka Connect REST API работает на порту 8083 — это основной интерфейс управления в distributed режиме. Все операции (создание, пауза, удаление коннекторов) выполняются через HTTP API, а не через конфигурационные файлы.


Внутренние топики: как Connect хранит состояние

В distributed режиме Connect использует три внутренних Kafka-топика:

ТопикНазначениеCleanup Policy
connect-configsКонфигурации коннекторов и задачcompact
connect-offsetsПозиции чтения source-коннекторовcompact
connect-statusСтатусы коннекторов и задачcompact
Внутренние топики Kafka Connect

Worker 1

Worker 1: JVM-процесс Connect. Выполняет назначенные задачи коннекторов. Участвует в перебалансировке при изменении состава кластера. Слушает REST API на порту 8083.

Worker 2

Worker 2: участник того же Connect-кластера (одинаковый group.id). Если Worker 1 падает, его задачи перераспределяются на Worker 2 и Worker 3.

Worker 3

Worker 3: третий участник кластера. Чем больше воркеров — тем больше задач может исполняться параллельно, и тем выше отказоустойчивость.
read/write state
connect-configsХранит конфигурации всех коннекторов и задач. Log-compacted топик — всегда доступна актуальная конфигурация. При добавлении нового воркера он читает этот топик и восстанавливает все конфигурации.
connect-offsetsХранит offset-позиции source-коннекторов: позиция в файле, значение incrementing-колонки JDBC, binlog position Debezium. Записывается после каждого успешного produce-батча. Compacted — хранит только последний offset на каждый ключ.
connect-statusСтатусы коннекторов (RUNNING, PAUSED, FAILED, UNASSIGNED) и каждой задачи. Обновляется при изменении состояния. REST API /connectors/{name}/status читает именно этот топик.

Connector vs Task: разделение ответственности

Разграничение между Connector и Task — ключевое для понимания параллелизма Connect.

Connector отвечает за:

  • Валидацию конфигурации
  • Определение списка задач и их параметров (taskConfigs())
  • Мониторинг источника данных

Task отвечает за:

  • Фактическое чтение/запись данных
  • Source task: метод poll() возвращает список SourceRecord
  • Sink task: метод put() принимает пакет SinkRecord и записывает во внешнюю систему

Параметр tasks.max определяет максимальное число задач, которые Connect создаст для одного коннектора. Реальное число задач может быть меньше — ограничено числом партиций источника.

tasks.max=4  →  Connect создаст до 4 задач
                Если источник поддерживает только 2 потока — создастся 2 задачи
                Задачи распределяются по воркерам кластера автоматически

При падении воркера его задачи перераспределяются на оставшихся участников кластера. Это ключевое отличие distributed режима от standalone.


Converters: формат данных в Kafka

Converter определяет, как записи сериализуются при записи в Kafka (source) и десериализуются при чтении из Kafka (sink).

Базовые конвертеры:

Converter ClassОписание
org.apache.kafka.connect.json.JsonConverterJSON с опциональной встроенной схемой
org.apache.kafka.connect.storage.StringConverterСтрока без схемы
org.apache.kafka.connect.converters.ByteArrayConverterСырые байты
io.confluent.connect.avro.AvroConverterAvro с Schema Registry
io.confluent.connect.protobuf.ProtobufConverterProtobuf с Schema Registry

Конвертеры задаются на уровне воркера (по умолчанию) или переопределяются на уровне коннектора:

# Конфигурация конкретного коннектора
{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}
NOTE

AvroConverter требует работающего Schema Registry. При сериализации конвертер регистрирует схему в Schema Registry и добавляет 5-байтовый заголовок (magic byte + schema ID) перед Avro-payload. Детально Schema Registry рассматривается в Модуле 06.


REST API: управление коннекторами

В distributed режиме все операции с коннекторами выполняются через REST API на порту 8083.

Основные эндпоинты:

МетодПутьДействие
POST/connectorsСоздать коннектор
GET/connectorsСписок всех коннекторов
GET/connectors/{name}Конфигурация коннектора
GET/connectors/{name}/statusСтатус коннектора и задач
PUT/connectors/{name}/configОбновить конфигурацию
POST/connectors/{name}/restartПерезапустить коннектор
POST/connectors/{name}/pauseПриостановить
POST/connectors/{name}/resumeВозобновить
DELETE/connectors/{name}Удалить коннектор
GET/connectors/{name}/tasks/{id}/statusСтатус конкретной задачи
POST/connectors/{name}/tasks/{id}/restartПерезапустить задачу

Создание коннектора:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-jdbc-source",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:postgresql://db:5432/mydb",
      "connection.user": "kafka",
      "connection.password": "secret",
      "table.whitelist": "orders",
      "mode": "incrementing",
      "incrementing.column.name": "id",
      "topic.prefix": "db-",
      "tasks.max": "2"
    }
  }'

Проверка статуса:

curl http://localhost:8083/connectors/my-jdbc-source/status
# {
#   "name": "my-jdbc-source",
#   "connector": { "state": "RUNNING", "worker_id": "worker1:8083" },
#   "tasks": [
#     { "id": 0, "state": "RUNNING", "worker_id": "worker1:8083" },
#     { "id": 1, "state": "RUNNING", "worker_id": "worker2:8083" }
#   ]
# }

Standalone vs Distributed: сравнение

СвойствоStandaloneDistributed
Число воркеров1Несколько (один group.id)
Хранение состоянияЛокальный файлKafka-топики
Перебалансировка задачНетДа (при падении воркера)
REST APIОграниченныйПолный (8083)
Конфигурация коннекторовФайлы .propertiesREST API
Production-использованиеНетДа

Ключевые выводы

  1. Kafka Connect — фреймворк для интеграции без написания producer/consumer кода.
  2. Standalone — для разработки и тестирования. Distributed — для production.
  3. В distributed режиме несколько воркеров с одинаковым group.id образуют кластер и автоматически перераспределяют задачи при отказах.
  4. Внутренние топики (connect-configs, connect-offsets, connect-status) — механизм персистентности состояния Connect.
  5. Converter определяет формат сериализации. AvroConverter требует Schema Registry (Module 06).
  6. REST API на порту 8083 — единственный способ управления коннекторами в distributed режиме.
Проверка знанийKnowledge check
Команда хочет развернуть Kafka Connect для production-нагрузки: несколько коннекторов, высокая доступность, возможность добавлять коннекторы без перезапуска системы. Какой режим выбрать и почему standalone не подходит?
ОтветAnswer
Для production необходим distributed режим. Standalone имеет три критических ограничения: (1) единая точка отказа — один процесс, нет автоматического восстановления задач при падении; (2) состояние (offset позиции) хранится в локальном файле — при потере файла или переезде на другой сервер позиция чтения теряется; (3) нет полного REST API — добавить коннектор без перезапуска процесса невозможно. Distributed режим решает все три проблемы: несколько воркеров автоматически перераспределяют задачи, состояние хранится в Kafka (надёжно и распределённо), REST API позволяет управлять коннекторами на лету. Минимальная конфигурация: group.id + три внутренних топика в connect-distributed.properties.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Команда развёртывает Kafka Connect в production. Какие три внутренних Kafka-топика необходимо создать для distributed режима и каково их назначение?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7