Sink Connectors
Sink connector читает данные из Kafka и записывает их во внешнюю систему. Это зеркальная задача по отношению к source connector: вместо чтения из внешней системы и записи в Kafka — чтение из Kafka и запись в хранилище, поисковый движок или объектное хранилище.
Жизненный цикл sink connector
Понимание жизненного цикла sink connector объясняет семантику доставки и поведение при сбоях.
- Инициализация. Worker вызывает
SinkConnector.start(props)— коннектор проверяет конфигурацию и устанавливает соединение с целевой системой. - Подписка. Каждая задача (SinkTask) подписывается на партиции исходного топика как обычный Kafka-консьюмер.
- Получение записей. Worker вызывает
SinkTask.put(records)с пакетомSinkRecord. Размер пакета определяетсяconsumer.max.poll.records. - Запись во внешнюю систему. Задача записывает пакет в целевую систему (базу данных, Elasticsearch, S3).
- Фиксация offset. После успешного возврата из
put()Worker коммитит offset в__consumer_offsets. Еслиput()бросает исключение — offset не коммитится, пакет будет обработан повторно (at-least-once).
Kafka Topic
Kafka Topic: исходный топик, из которого читает sink connector. Sink task подписывается на партиции этого топика как обычный consumer с group.id = connector name.Sink Task
Sink Task: единица работы sink connector. Получает пакет SinkRecord через метод put(records). Каждый SinkRecord содержит: topic, partition, offset, key, value — уже десериализованные через converter.Целевая система
Целевая система: база данных (JDBC), поисковый движок (Elasticsearch), объектное хранилище (S3), хранилище данных (Snowflake, BigQuery). Sink connector абстрагирует специфику записи за стандартным интерфейсом.JDBC Sink Connector
JdbcSinkConnector записывает данные из Kafka в реляционную базу данных. Поддерживает PostgreSQL, MySQL, Oracle, SQL Server.
{
"name": "jdbc-orders-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://db:5432/warehouse",
"connection.user": "kafka_sink",
"connection.password": "secret",
"topics": "db-orders",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"tasks.max": "2"
}
}
Режимы вставки
Режим (insert.mode) | SQL-операция | Когда использовать |
|---|---|---|
insert | INSERT INTO | Только новые записи, дубликаты не ожидаются |
upsert | INSERT ... ON CONFLICT UPDATE | Обновления существующих строк, идемпотентность |
update | UPDATE WHERE pk=? | Только обновления, ошибка если строки нет |
Режим upsert обеспечивает идемпотентность записи — при повторной обработке одного и того же события (at-least-once доставка) таблица остаётся консистентной.
Конфигурация первичного ключа
pk.mode | Источник PK | Применение |
|---|---|---|
none | Нет PK — только INSERT | Лог-таблицы без обновлений |
kafka | Topic + partition + offset | Уникальный PK из метаданных Kafka |
record_key | Поля из ключа сообщения | Бизнес-ключ (user_id, order_id) |
record_value | Поля из значения сообщения | PK встроен в тело записи |
Auto-create и auto-evolve
auto.create=true— автоматически создаёт таблицу при первом запуске, если её нет.auto.evolve=true— автоматически добавляет новые столбцы при изменении схемы сообщений.
При использовании AvroConverter с Schema Registry JDBC Sink автоматически получает схему для каждого сообщения и создаёт/эволюционирует таблицу в соответствии с Avro-схемой. Это одна из ключевых точек интеграции между Kafka Connect (Модуль 05) и Schema Registry (Модуль 06).
Elasticsearch Sink Connector
ElasticsearchSinkConnector индексирует сообщения из Kafka в Elasticsearch. Полезен для полнотекстового поиска, аналитических дашбордов (Kibana), log aggregation.
{
"name": "elasticsearch-events-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"topics": "user-events",
"key.ignore": "false",
"schema.ignore": "false",
"compact.map.fixed.schema": "false",
"tasks.max": "3"
}
}
Параметр key.ignore:
false(по умолчанию) — использует ключ сообщения Kafka как_idдокумента Elasticsearch. Повторная запись того же ключа обновляет документ (идемпотентно).true— использует комбинациюtopic+partition+offsetкак_id. Каждое сообщение — уникальный документ.
Для режима полнотекстового поиска, где одно событие = один документ, используйте key.ignore=false с бизнес-ключом (user_id, event_id) — это обеспечивает идемпотентность при повторных доставках.
S3 Sink Connector
S3SinkConnector записывает пакеты сообщений из Kafka в Amazon S3 (или S3-совместимое хранилище: MinIO, GCS). Подходит для data lake ingestion, долгосрочного хранения, аналитических конвейеров.
{
"name": "s3-events-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.bucket.name": "my-data-lake",
"s3.region": "us-east-1",
"topics": "user-events",
"topics.dir": "kafka-data",
"flush.size": "1000",
"rotate.interval.ms": "600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.compatibility": "FULL",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "ru_RU",
"timezone": "Europe/Moscow",
"tasks.max": "4"
}
}
Форматы файлов
format.class | Расширение | Особенности |
|---|---|---|
AvroFormat | .avro | Компактный бинарный формат, встроенная схема, поддержка schema evolution |
JsonFormat | .json | Читаемый текст, больший размер, без компрессии схемы |
ParquetFormat | .snappy.parquet | Колоночное хранение, оптимально для аналитики (Spark, Athena) |
Партиционеры S3
Параметр partitioner.class определяет структуру директорий в S3:
| Партиционер | Структура пути | Применение |
|---|---|---|
DefaultPartitioner | {topic}/{partition}/ | Простая структура, нет временного разбиения |
TimeBasedPartitioner | year=2024/month=01/day=15/hour=10/ | Партиционирование по времени — оптимально для Athena/Hive |
FieldPartitioner | {field_value}/ | Партиционирование по значению поля (например, region=EU/) |
DailyPartitioner | year=2024/month=01/day=15/ | Ежедневные папки |
Когда создаётся новый S3-файл
S3 Sink создаёт новый файл при любом из условий:
- Накоплено
flush.sizeзаписей. - Прошло
rotate.interval.msмиллисекунд с последней ротации. - Произошла смена временного раздела (при TimeBasedPartitioner).
Tracking consumer offset: sink как обычный consumer
Sink connector использует стандартный Kafka consumer под капотом. Group ID коннектора = имя коннектора. Consumer offset хранится в __consumer_offsets.
Это означает:
- Инструменты мониторинга consumer lag (Kafka UI, Grafana + kafka_exporter) работают для sink коннекторов из коробки.
kafka-consumer-groups.sh --group my-sink-connectorпокажет отставание задач.- При перезапуске задача продолжит с последнего закоммиченного offset.
Exactly-once для sink connectors
По умолчанию sink connector обеспечивает at-least-once: при сбое задача повторно обработает последний незакоммиченный пакет. Внешняя система может получить дублирующиеся записи.
Стратегии достижения идемпотентности:
| Целевая система | Механизм | Конфигурация |
|---|---|---|
| JDBC | Upsert по первичному ключу | insert.mode=upsert, pk.mode=record_key |
| Elasticsearch | Upsert по _id | key.ignore=false с бизнес-ключом |
| S3 | Перезапись файла (атомарная операция) | Ключ файла детерминирован — повторная запись = идемпотентно |
Именно здесь AvroConverter и Schema Registry дают преимущество: схема автоматически передаётся между source и sink, что позволяет JDBC Sink создавать таблицу с правильными типами без ручной настройки DDL. Детальная интеграция описана в Модуле 06.
Схема конвейера с sink connectors
Kafka Topic
Kafka Topic: один топик может потребляться несколькими sink connectors одновременно. Каждый коннектор — отдельная consumer group с независимым offset.Sink Task
Sink Task (Worker): читает записи из Kafka через consumer API, десериализует через converter, передаёт в SinkTask.put(). После успеха — коммитит offset.PostgreSQL
PostgreSQL (JDBC Sink): upsert-режим по PK обеспечивает идемпотентность. При повторной доставке строка обновляется, не дублируется. auto.create создаёт таблицу автоматически.Elasticsearch
Elasticsearch (ES Sink): индексирует каждую запись как документ. key.ignore=false использует Kafka-ключ как _id, обеспечивая idempotent upsert при повторах.S3 Data Lake
S3 (S3 Sink): буферизует записи до достижения flush.size или rotate.interval.ms, затем создаёт файл в S3. TimeBasedPartitioner организует файлы по дате/часу для Athena-запросов.Ключевые выводы
- Sink connector читает из Kafka как обычный consumer и пишет во внешнюю систему через
SinkTask.put(). - Offset хранится в
__consumer_offsets— стандартный механизм Kafka. - JDBC Sink поддерживает
upsertдля идемпотентной записи.auto.createсоздаёт таблицу автоматически. - Elasticsearch Sink —
key.ignore=falseиспользует бизнес-ключ как_idдля idempotent upsert. - S3 Sink буферизует данные до
flush.sizeилиrotate.interval.ms. Parquet + TimeBasedPartitioner оптимально для аналитики. - AvroConverter с Schema Registry (Модуль 06) обеспечивает автоматическую передачу схемы между source и sink.