Learning Platform
Глоссарий Troubleshooting
Урок 06.07 · 10 мин
Средний
SummaryReview

Итоги модуля: Kafka Connect

Модуль 05 охватил полный стек Kafka Connect: от архитектурных основ до production-паттернов обработки ошибок. Перед тем как переходить к Модулю 06 (Schema Registry), убедитесь в понимании ключевых концепций.


Архитектура Connect (Урок 01)

Основные компоненты:

  • Worker — JVM-процесс, исполняющий задачи коннекторов
  • Connector — описывает задачу и порождает Task-конфигурации
  • Task — единица работы (poll() для source, put() для sink)
  • Converter — сериализатор/десериализатор (JSON, Avro, Protobuf)

Два режима:

  • Standalone — один процесс, локальное хранение offset, нет REST API управления. Только для разработки.
  • Distributed — несколько воркеров с общим group.id. Состояние в Kafka-топиках. Полный REST API на порту 8083. Production-режим.

Внутренние топики distributed режима:

ТопикХранит
connect-configsКонфигурации коннекторов
connect-offsetsПозиции source-коннекторов
connect-statusСтатусы коннекторов и задач

Source Connectors (Урок 02)

Жизненный цикл: poll() → serialize → produce → commit offset в connect-offsets

Ключевые коннекторы:

КоннекторПрименениеОграничения
FileStreamSourceТолько учебные примерыНе production
JDBC SourceРеляционные БД (INSERT + UPDATE)Не видит DELETE
DebeziumПолное CDC (INSERT + UPDATE + DELETE)Читает WAL/binlog

JDBC Source режимы: bulk, incrementing, timestamp, timestamp+incrementing

Offset источника: хранится в connect-offsets (не в __consumer_offsets — это только sink)


Sink Connectors (Урок 03)

Жизненный цикл: consume → deserialize → put() → commit offset в __consumer_offsets

Ключевые коннекторы:

КоннекторПрименениеИдемпотентность
JDBC SinkРеляционные БДinsert.mode=upsert по PK
Elasticsearch SinkПолнотекстовый поиск, аналитикаkey.ignore=false → _id по Kafka-ключу
S3 SinkData lake, долгосрочное хранениеАтомарная перезапись файла

S3 форматы: Avro, JSON, Parquet (предпочтительно для аналитики)


Single Message Transforms (Урок 04)

Позиция в pipeline: внутри Worker, до записи в Kafka (source) или до put() (sink)

Синтаксис:

"transforms": "step1,step2,step3"
"transforms.step1.type": "...InsertField$Value"
"transforms.step1.static.field": "env"
"transforms.step1.static.value": "production"

Ключевые встроенные SMT:

SMTДействие
InsertFieldДобавить статическое поле или метаданные
ReplaceFieldПереименовать / включить / исключить поля
MaskFieldЗамаскировать чувствительные данные
ValueToKeyПеренести поля из value в key
TimestampRouterИзменить топик по timestamp
RegexRouterИзменить топик по regex
Filter + PredicateОтбросить записи по условию

Критическое правило: SMT применяются строго слева направо. Порядок определяет результат.

Граница применимости: только per-record, без состояния. Join, aggregate, window → Kafka Streams (Модуль 07).


Error Handling и DLQ (Урок 05)

Стратегии:

errors.toleranceПоведениеПрименение
none (по умолчанию)Остановить задачу при первой ошибкеDev/тесты
allПропустить ошибочные записиProduction с DLQ

Production-конфигурация:

{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "my-connector-dlq",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.retry.delay.max.ms": "60000",
  "errors.retry.timeout": "300000",
  "errors.log.enable": "true"
}

DLQ заголовки: __connect.errors.topic, __connect.errors.offset, __connect.errors.exception.message, __connect.errors.stage — полный контекст для диагностики.


Дерево решений: что выбрать

Source или sink connector?

  • Нужно читать данные из внешней системы → source connector
  • Нужно записывать данные из Kafka во внешнюю систему → sink connector

JDBC Source vs Debezium?

  • Нужен только INSERT, задержка в минуты допустима → JDBC Source (проще)
  • Нужен полный CDC (INSERT + UPDATE + DELETE), задержка миллисекунды → Debezium

SMT vs Kafka Streams?

  • Нужно добавить/переименовать/замаскировать поле per-record → SMT
  • Нужен join между топиками, агрегация, windowing → Kafka Streams (Модуль 07)

errors.tolerance=none vs all?

  • Dev/тестирование, качество данных гарантировано → none
  • Production, данные могут быть некорректными → all + DLQ + мониторинг

Связь с Модулем 06: Schema Registry

Connect и Schema Registry тесно интегрированы через AvroConverter. Эта связь важна:

{
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

Что происходит при использовании AvroConverter:

  1. Source connector производит запись → converter регистрирует Avro-схему в Schema Registry и получает числовой schema ID.
  2. Каждое сообщение в Kafka содержит 5-байтовый заголовок (0x00 magic byte + 4-байтовый schema ID) перед Avro-payload.
  3. Sink connector читает запись → converter извлекает schema ID из заголовка, получает схему из Schema Registry, десериализует payload.

Критическая зависимость: если Schema Registry недоступна при запуске коннектора с AvroConverter — коннектор не сможет ни сериализовать, ни десериализовать данные. Задача перейдёт в FAILED.

Модуль 06 детально рассматривает: wire format (5-байтовый заголовок), все 7 режимов совместимости (BACKWARD, FORWARD, FULL, NONE + транзитивные варианты), subject naming strategies, и REST API Schema Registry.

TIP

Перед изучением Модуля 06 рекомендуется убедиться, что вы понимаете: (1) как AvroConverter Connect использует Schema Registry, (2) что такое schema ID в 5-байтовом заголовке, (3) почему Schema Registry обязателен в production-конвейерах с Avro. Это заложит правильную ментальную модель для изучения compatibility modes.


Ключевые выводы модуля

  1. Kafka Connect — фреймворк для интеграции без написания producer/consumer кода. Distributed режим для production.
  2. Source connector (poll → offset в connect-offsets) и sink connector (put → offset в __consumer_offsets) — симметричные концепции.
  3. JDBC Source покрывает большинство реляционных сценариев; Debezium — для полного CDC.
  4. SMT обрабатывают записи per-record в task thread; для stateful логики — Kafka Streams.
  5. Production error handling: errors.tolerance=all + DLQ + context headers + мониторинг lag.
  6. AvroConverter создаёт жёсткую зависимость от Schema Registry — детали в Модуле 06.
Проверка знанийKnowledge check
Проектируете конвейер: PostgreSQL (таблица payments) → Kafka → Elasticsearch. Нужно: (1) захватывать UPDATE и DELETE, (2) маскировать поле card_number, (3) продолжать работу при некорректных записях. Назовите компоненты конвейера и их конфигурацию.
ОтветAnswer
(1) Source connector: Debezium PostgreSQL Connector (не JDBC Source, так как нужен UPDATE и DELETE). Конфигурация: connector.class=io.debezium.connector.postgresql.PostgresConnector, database.server.name, table.include.list=public.payments. (2) SMT: transforms=maskCard, transforms.maskCard.type=org.apache.kafka.connect.transforms.MaskField$Value, transforms.maskCard.fields=card_number — применяется в source connector до записи в Kafka. (3) Sink connector: ElasticsearchSinkConnector с key.ignore=false для idempotent upsert. (4) Error handling в обоих коннекторах: errors.tolerance=all, errors.deadletterqueue.topic.name=payments-dlq, errors.deadletterqueue.context.headers.enable=true. Мониторинг: алерт на lag в DLQ топике.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7