Learning Platform
Глоссарий Troubleshooting
Урок 06.03 · 20 мин
Средний
Sink ConnectorJdbcSinkElasticsearchSinkS3 SinkExactly-once Sink

Sink Connectors

Sink connector читает данные из Kafka и записывает их во внешнюю систему. Это зеркальная задача по отношению к source connector: вместо чтения из внешней системы и записи в Kafka — чтение из Kafka и запись в хранилище, поисковый движок или объектное хранилище.


Жизненный цикл sink connector

Понимание жизненного цикла sink connector объясняет семантику доставки и поведение при сбоях.

  1. Инициализация. Worker вызывает SinkConnector.start(props) — коннектор проверяет конфигурацию и устанавливает соединение с целевой системой.
  2. Подписка. Каждая задача (SinkTask) подписывается на партиции исходного топика как обычный Kafka-консьюмер.
  3. Получение записей. Worker вызывает SinkTask.put(records) с пакетом SinkRecord. Размер пакета определяется consumer.max.poll.records.
  4. Запись во внешнюю систему. Задача записывает пакет в целевую систему (базу данных, Elasticsearch, S3).
  5. Фиксация offset. После успешного возврата из put() Worker коммитит offset в __consumer_offsets. Если put() бросает исключение — offset не коммитится, пакет будет обработан повторно (at-least-once).
Жизненный цикл Sink Connector

Kafka Topic

Kafka Topic: исходный топик, из которого читает sink connector. Sink task подписывается на партиции этого топика как обычный consumer с group.id = connector name.
consume

Sink Task

Sink Task: единица работы sink connector. Получает пакет SinkRecord через метод put(records). Каждый SinkRecord содержит: topic, partition, offset, key, value — уже десериализованные через converter.
put(records)

Целевая система

Целевая система: база данных (JDBC), поисковый движок (Elasticsearch), объектное хранилище (S3), хранилище данных (Snowflake, BigQuery). Sink connector абстрагирует специфику записи за стандартным интерфейсом.
commit offset (on success)
__consumer_offsetsСтандартный Kafka-топик для хранения consumer group offset. Sink connector использует его как обычный consumer — commit offset вызывается только после успешного выполнения put(). Если put() завершается с ошибкой — offset не коммитится, задача обработает тот же пакет повторно.

JDBC Sink Connector

JdbcSinkConnector записывает данные из Kafka в реляционную базу данных. Поддерживает PostgreSQL, MySQL, Oracle, SQL Server.

{
  "name": "jdbc-orders-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://db:5432/warehouse",
    "connection.user": "kafka_sink",
    "connection.password": "secret",

    "topics": "db-orders",
    "auto.create": "true",
    "auto.evolve": "true",

    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",

    "tasks.max": "2"
  }
}

Режимы вставки

Режим (insert.mode)SQL-операцияКогда использовать
insertINSERT INTOТолько новые записи, дубликаты не ожидаются
upsertINSERT ... ON CONFLICT UPDATEОбновления существующих строк, идемпотентность
updateUPDATE WHERE pk=?Только обновления, ошибка если строки нет

Режим upsert обеспечивает идемпотентность записи — при повторной обработке одного и того же события (at-least-once доставка) таблица остаётся консистентной.

Конфигурация первичного ключа

pk.modeИсточник PKПрименение
noneНет PK — только INSERTЛог-таблицы без обновлений
kafkaTopic + partition + offsetУникальный PK из метаданных Kafka
record_keyПоля из ключа сообщенияБизнес-ключ (user_id, order_id)
record_valueПоля из значения сообщенияPK встроен в тело записи

Auto-create и auto-evolve

  • auto.create=true — автоматически создаёт таблицу при первом запуске, если её нет.
  • auto.evolve=true — автоматически добавляет новые столбцы при изменении схемы сообщений.
NOTE

При использовании AvroConverter с Schema Registry JDBC Sink автоматически получает схему для каждого сообщения и создаёт/эволюционирует таблицу в соответствии с Avro-схемой. Это одна из ключевых точек интеграции между Kafka Connect (Модуль 05) и Schema Registry (Модуль 06).


Elasticsearch Sink Connector

ElasticsearchSinkConnector индексирует сообщения из Kafka в Elasticsearch. Полезен для полнотекстового поиска, аналитических дашбордов (Kibana), log aggregation.

{
  "name": "elasticsearch-events-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",

    "topics": "user-events",
    "key.ignore": "false",

    "schema.ignore": "false",
    "compact.map.fixed.schema": "false",

    "tasks.max": "3"
  }
}

Параметр key.ignore:

  • false (по умолчанию) — использует ключ сообщения Kafka как _id документа Elasticsearch. Повторная запись того же ключа обновляет документ (идемпотентно).
  • true — использует комбинацию topic+partition+offset как _id. Каждое сообщение — уникальный документ.

Для режима полнотекстового поиска, где одно событие = один документ, используйте key.ignore=false с бизнес-ключом (user_id, event_id) — это обеспечивает идемпотентность при повторных доставках.


S3 Sink Connector

S3SinkConnector записывает пакеты сообщений из Kafka в Amazon S3 (или S3-совместимое хранилище: MinIO, GCS). Подходит для data lake ingestion, долгосрочного хранения, аналитических конвейеров.

{
  "name": "s3-events-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.bucket.name": "my-data-lake",
    "s3.region": "us-east-1",

    "topics": "user-events",
    "topics.dir": "kafka-data",

    "flush.size": "1000",
    "rotate.interval.ms": "600000",

    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.compatibility": "FULL",

    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "ru_RU",
    "timezone": "Europe/Moscow",

    "tasks.max": "4"
  }
}

Форматы файлов

format.classРасширениеОсобенности
AvroFormat.avroКомпактный бинарный формат, встроенная схема, поддержка schema evolution
JsonFormat.jsonЧитаемый текст, больший размер, без компрессии схемы
ParquetFormat.snappy.parquetКолоночное хранение, оптимально для аналитики (Spark, Athena)

Партиционеры S3

Параметр partitioner.class определяет структуру директорий в S3:

ПартиционерСтруктура путиПрименение
DefaultPartitioner{topic}/{partition}/Простая структура, нет временного разбиения
TimeBasedPartitioneryear=2024/month=01/day=15/hour=10/Партиционирование по времени — оптимально для Athena/Hive
FieldPartitioner{field_value}/Партиционирование по значению поля (например, region=EU/)
DailyPartitioneryear=2024/month=01/day=15/Ежедневные папки

Когда создаётся новый S3-файл

S3 Sink создаёт новый файл при любом из условий:

  1. Накоплено flush.size записей.
  2. Прошло rotate.interval.ms миллисекунд с последней ротации.
  3. Произошла смена временного раздела (при TimeBasedPartitioner).

Tracking consumer offset: sink как обычный consumer

Sink connector использует стандартный Kafka consumer под капотом. Group ID коннектора = имя коннектора. Consumer offset хранится в __consumer_offsets.

Это означает:

  • Инструменты мониторинга consumer lag (Kafka UI, Grafana + kafka_exporter) работают для sink коннекторов из коробки.
  • kafka-consumer-groups.sh --group my-sink-connector покажет отставание задач.
  • При перезапуске задача продолжит с последнего закоммиченного offset.

Exactly-once для sink connectors

По умолчанию sink connector обеспечивает at-least-once: при сбое задача повторно обработает последний незакоммиченный пакет. Внешняя система может получить дублирующиеся записи.

Стратегии достижения идемпотентности:

Целевая системаМеханизмКонфигурация
JDBCUpsert по первичному ключуinsert.mode=upsert, pk.mode=record_key
ElasticsearchUpsert по _idkey.ignore=false с бизнес-ключом
S3Перезапись файла (атомарная операция)Ключ файла детерминирован — повторная запись = идемпотентно
NOTE

Именно здесь AvroConverter и Schema Registry дают преимущество: схема автоматически передаётся между source и sink, что позволяет JDBC Sink создавать таблицу с правильными типами без ручной настройки DDL. Детальная интеграция описана в Модуле 06.


Схема конвейера с sink connectors

Kafka Connect Sink Pipeline

Kafka Topic

Kafka Topic: один топик может потребляться несколькими sink connectors одновременно. Каждый коннектор — отдельная consumer group с независимым offset.

Sink Task

Sink Task (Worker): читает записи из Kafka через consumer API, десериализует через converter, передаёт в SinkTask.put(). После успеха — коммитит offset.
три потребителя одного топика

PostgreSQL

PostgreSQL (JDBC Sink): upsert-режим по PK обеспечивает идемпотентность. При повторной доставке строка обновляется, не дублируется. auto.create создаёт таблицу автоматически.

Elasticsearch

Elasticsearch (ES Sink): индексирует каждую запись как документ. key.ignore=false использует Kafka-ключ как _id, обеспечивая idempotent upsert при повторах.

S3 Data Lake

S3 (S3 Sink): буферизует записи до достижения flush.size или rotate.interval.ms, затем создаёт файл в S3. TimeBasedPartitioner организует файлы по дате/часу для Athena-запросов.

Ключевые выводы

  1. Sink connector читает из Kafka как обычный consumer и пишет во внешнюю систему через SinkTask.put().
  2. Offset хранится в __consumer_offsets — стандартный механизм Kafka.
  3. JDBC Sink поддерживает upsert для идемпотентной записи. auto.create создаёт таблицу автоматически.
  4. Elasticsearch Sinkkey.ignore=false использует бизнес-ключ как _id для idempotent upsert.
  5. S3 Sink буферизует данные до flush.size или rotate.interval.ms. Parquet + TimeBasedPartitioner оптимально для аналитики.
  6. AvroConverter с Schema Registry (Модуль 06) обеспечивает автоматическую передачу схемы между source и sink.
Проверка знанийKnowledge check
Sink connector записывает данные в PostgreSQL с insert.mode=insert. Worker crashes на полпути через пакет из 500 записей — 300 записей уже записано в базу, offset не закоммичен. После перезапуска задача снова получит тот же пакет из 500 записей. Что произойдёт с 300 уже записанными строками?
ОтветAnswer
При insert.mode=insert произойдёт дублирование: 300 строк будут вставлены повторно. Если у таблицы есть UNIQUE или PRIMARY KEY constraint — 300 INSERT завершатся ошибкой constraint violation, и весь пакет может упасть с ошибкой (поведение зависит от конфигурации error.tolerance). Решение: использовать insert.mode=upsert с pk.mode=record_key. Upsert через INSERT ... ON CONFLICT DO UPDATE идемпотентен: повторная запись той же строки обновляет её, а не создаёт дубликат. Это позволяет безопасно обрабатывать пакет повторно при at-least-once семантике.
ClickHouse: MaterializedPostgreSQL/MySQL ClickPipes: managed CDC в ClickHouse Cloud

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Sink connector вызывает SinkTask.put(records) с пакетом из 200 записей. После записи 150-й записи в PostgreSQL приложение выбрасывает исключение. Offset не закоммичен. Что произойдёт при следующем вызове put()?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7