Итоги модуля: Kafka Connect
Модуль 05 охватил полный стек Kafka Connect: от архитектурных основ до production-паттернов обработки ошибок. Перед тем как переходить к Модулю 06 (Schema Registry), убедитесь в понимании ключевых концепций.
Архитектура Connect (Урок 01)
Основные компоненты:
- Worker — JVM-процесс, исполняющий задачи коннекторов
- Connector — описывает задачу и порождает Task-конфигурации
- Task — единица работы (poll() для source, put() для sink)
- Converter — сериализатор/десериализатор (JSON, Avro, Protobuf)
Два режима:
- Standalone — один процесс, локальное хранение offset, нет REST API управления. Только для разработки.
- Distributed — несколько воркеров с общим
group.id. Состояние в Kafka-топиках. Полный REST API на порту 8083. Production-режим.
Внутренние топики distributed режима:
| Топик | Хранит |
|---|---|
connect-configs | Конфигурации коннекторов |
connect-offsets | Позиции source-коннекторов |
connect-status | Статусы коннекторов и задач |
Source Connectors (Урок 02)
Жизненный цикл: poll() → serialize → produce → commit offset в connect-offsets
Ключевые коннекторы:
| Коннектор | Применение | Ограничения |
|---|---|---|
| FileStreamSource | Только учебные примеры | Не production |
| JDBC Source | Реляционные БД (INSERT + UPDATE) | Не видит DELETE |
| Debezium | Полное CDC (INSERT + UPDATE + DELETE) | Читает WAL/binlog |
JDBC Source режимы: bulk, incrementing, timestamp, timestamp+incrementing
Offset источника: хранится в connect-offsets (не в __consumer_offsets — это только sink)
Sink Connectors (Урок 03)
Жизненный цикл: consume → deserialize → put() → commit offset в __consumer_offsets
Ключевые коннекторы:
| Коннектор | Применение | Идемпотентность |
|---|---|---|
| JDBC Sink | Реляционные БД | insert.mode=upsert по PK |
| Elasticsearch Sink | Полнотекстовый поиск, аналитика | key.ignore=false → _id по Kafka-ключу |
| S3 Sink | Data lake, долгосрочное хранение | Атомарная перезапись файла |
S3 форматы: Avro, JSON, Parquet (предпочтительно для аналитики)
Single Message Transforms (Урок 04)
Позиция в pipeline: внутри Worker, до записи в Kafka (source) или до put() (sink)
Синтаксис:
"transforms": "step1,step2,step3"
"transforms.step1.type": "...InsertField$Value"
"transforms.step1.static.field": "env"
"transforms.step1.static.value": "production"
Ключевые встроенные SMT:
| SMT | Действие |
|---|---|
InsertField | Добавить статическое поле или метаданные |
ReplaceField | Переименовать / включить / исключить поля |
MaskField | Замаскировать чувствительные данные |
ValueToKey | Перенести поля из value в key |
TimestampRouter | Изменить топик по timestamp |
RegexRouter | Изменить топик по regex |
Filter + Predicate | Отбросить записи по условию |
Критическое правило: SMT применяются строго слева направо. Порядок определяет результат.
Граница применимости: только per-record, без состояния. Join, aggregate, window → Kafka Streams (Модуль 07).
Error Handling и DLQ (Урок 05)
Стратегии:
errors.tolerance | Поведение | Применение |
|---|---|---|
none (по умолчанию) | Остановить задачу при первой ошибке | Dev/тесты |
all | Пропустить ошибочные записи | Production с DLQ |
Production-конфигурация:
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "my-connector-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.retry.delay.max.ms": "60000",
"errors.retry.timeout": "300000",
"errors.log.enable": "true"
}
DLQ заголовки: __connect.errors.topic, __connect.errors.offset, __connect.errors.exception.message, __connect.errors.stage — полный контекст для диагностики.
Дерево решений: что выбрать
Source или sink connector?
- Нужно читать данные из внешней системы → source connector
- Нужно записывать данные из Kafka во внешнюю систему → sink connector
JDBC Source vs Debezium?
- Нужен только INSERT, задержка в минуты допустима → JDBC Source (проще)
- Нужен полный CDC (INSERT + UPDATE + DELETE), задержка миллисекунды → Debezium
SMT vs Kafka Streams?
- Нужно добавить/переименовать/замаскировать поле per-record → SMT
- Нужен join между топиками, агрегация, windowing → Kafka Streams (Модуль 07)
errors.tolerance=none vs all?
- Dev/тестирование, качество данных гарантировано →
none - Production, данные могут быть некорректными →
all+ DLQ + мониторинг
Связь с Модулем 06: Schema Registry
Connect и Schema Registry тесно интегрированы через AvroConverter. Эта связь важна:
{
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
Что происходит при использовании AvroConverter:
- Source connector производит запись → converter регистрирует Avro-схему в Schema Registry и получает числовой schema ID.
- Каждое сообщение в Kafka содержит 5-байтовый заголовок (0x00 magic byte + 4-байтовый schema ID) перед Avro-payload.
- Sink connector читает запись → converter извлекает schema ID из заголовка, получает схему из Schema Registry, десериализует payload.
Критическая зависимость: если Schema Registry недоступна при запуске коннектора с AvroConverter — коннектор не сможет ни сериализовать, ни десериализовать данные. Задача перейдёт в FAILED.
Модуль 06 детально рассматривает: wire format (5-байтовый заголовок), все 7 режимов совместимости (BACKWARD, FORWARD, FULL, NONE + транзитивные варианты), subject naming strategies, и REST API Schema Registry.
Перед изучением Модуля 06 рекомендуется убедиться, что вы понимаете: (1) как AvroConverter Connect использует Schema Registry, (2) что такое schema ID в 5-байтовом заголовке, (3) почему Schema Registry обязателен в production-конвейерах с Avro. Это заложит правильную ментальную модель для изучения compatibility modes.
Ключевые выводы модуля
- Kafka Connect — фреймворк для интеграции без написания producer/consumer кода. Distributed режим для production.
- Source connector (poll → offset в connect-offsets) и sink connector (put → offset в __consumer_offsets) — симметричные концепции.
- JDBC Source покрывает большинство реляционных сценариев; Debezium — для полного CDC.
- SMT обрабатывают записи per-record в task thread; для stateful логики — Kafka Streams.
- Production error handling: errors.tolerance=all + DLQ + context headers + мониторинг lag.
- AvroConverter создаёт жёсткую зависимость от Schema Registry — детали в Модуле 06.