Error Handling и Dead Letter Queue
В production-конвейере данные не всегда идеальны. Записи с некорректным форматом, нарушениями схемы, NULL в обязательных полях, переполнением числовых типов — всё это реальные сценарии. По умолчанию Kafka Connect останавливает задачу при первой же ошибке. Одна некорректная запись из миллиона может остановить весь конвейер.
Правильная стратегия обработки ошибок включает три уровня: толерантность к ошибкам (продолжать или остановиться), DLQ (куда складывать проблемные записи), мониторинг (как узнать, что проблемы есть).
Поведение по умолчанию: errors.tolerance=none
Без явной конфигурации Connect использует errors.tolerance=none. При любой ошибке обработки записи задача переходит в состояние FAILED.
Что считается ошибкой:
- Ошибка десериализации (неверный формат JSON, несовместимая Avro-схема)
- Ошибка трансформации в SMT (NullPointerException при попытке обратиться к отсутствующему полю)
- Ошибка записи в целевую систему (JDBC constraint violation, Elasticsearch mapping conflict)
При errors.tolerance=none одна некорректная запись из миллиона блокирует весь конвейер. После перехода задачи в FAILED автоматического восстановления нет — нужно вручную исправить данные и перезапустить задачу через REST API (POST /connectors/{name}/tasks/{id}/restart).
errors.tolerance=all: пропуск ошибочных записей
errors.tolerance=all указывает Connect продолжать обработку при ошибках, пропуская проблемные записи.
{
"errors.tolerance": "all"
}
При errors.tolerance=all:
- Ошибочные записи пропускаются — конвейер не останавливается.
- По умолчанию проблемные записи теряются (молчаливый сброс), если не настроен DLQ.
- Ошибки логируются в log-файл воркера (уровень WARN).
Без DLQ это режим «пропусти и забудь»: данные теряются без возможности анализа причины.
Dead Letter Queue: сохранение проблемных записей
DLQ (Dead Letter Queue) — отдельный Kafka-топик, в который записываются проблемные записи. Это позволяет:
- Не терять данные при ошибках обработки.
- Анализировать причины ошибок в отдельном потоке.
- Повторно обработать записи после исправления логики.
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "orders-connector-dlq",
"errors.deadletterqueue.context.headers.enable": "true"
}
Kafka Topic
Kafka Topic: исходный топик. Sink task читает все записи, включая потенциально некорректные.Sink Task
Sink Task: пытается десериализовать, применить SMT, записать в целевую систему. При ошибке — обращается к error handler.Целевая система
Целевая система: успешно обработанные записи записываются сюда. Offset коммитится только для успешных записей.DLQ Topic
DLQ Topic: ошибочные записи с заголовками контекста. Отдельный consumer может читать DLQ для анализа, ретрая, оповещений. Retention DLQ должен быть достаточным для реакции команды.DLQ заголовки: контекст ошибки
При errors.deadletterqueue.context.headers.enable=true каждая запись в DLQ содержит заголовки с контекстом ошибки:
| Заголовок | Содержимое |
|---|---|
__connect.errors.topic | Исходный топик |
__connect.errors.partition | Номер партиции |
__connect.errors.offset | Offset исходной записи |
__connect.errors.connector.name | Имя коннектора |
__connect.errors.task.id | ID задачи |
__connect.errors.stage | Этап, на котором произошла ошибка (converter, transform, task-put) |
__connect.errors.class.name | Класс исключения |
__connect.errors.exception.message | Сообщение исключения |
__connect.errors.exception.stacktrace | Полный stack trace |
Эти заголовки критически важны для диагностики: они позволяют определить, в каком топике, партиции и offset была проблемная запись, на каком этапе возникла ошибка, и что именно пошло не так.
Логирование ошибок
Дополнительно к DLQ можно включить логирование ошибок в log-файл воркера:
{
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
errors.log.enable=true— записывает информацию об ошибке в log.errors.log.include.messages=true— включает в лог содержимое проблемной записи (ключ и значение). Осторожно: если запись содержит чувствительные данные — они попадут в лог.
Retry: повторные попытки перед DLQ
Некоторые ошибки временные: база данных недоступна на несколько секунд, Elasticsearch перегружен. Для таких случаев есть retry-механизм:
{
"errors.retry.delay.max.ms": "60000",
"errors.retry.timeout": "300000"
}
errors.retry.delay.max.ms— максимальная задержка между повторными попытками (экспоненциальный backoff до этого значения).errors.retry.timeout— общее время, в течение которого Connect пробует повторно. После истечения — запись уходит в DLQ (если настроен) или пропускается.
Retry применяется на уровне задачи, не всего коннектора. При исчерпании retry задача не переходит в FAILED при errors.tolerance=all — ошибочная запись просто уходит в DLQ.
Полная production-конфигурация
Рекомендуемая production-конфигурация обработки ошибок:
{
"name": "orders-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "orders",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "orders-sink-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.retry.delay.max.ms": "60000",
"errors.retry.timeout": "300000",
"errors.log.enable": "true",
"errors.log.include.messages": "false"
}
}
DLQ топик должен иметь retention.ms достаточным для реакции вашей команды. Рекомендация: 30 дней (2592000000 мс). Если команда обнаружит проблему через неделю — записи должны быть доступны для анализа и ретрая.
Мониторинг DLQ
DLQ — это обычный Kafka-топик. Мониторинг строится на стандартных инструментах:
Consumer lag на DLQ:
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group dlq-monitor-group
Если lag растёт — значит в DLQ поступают новые записи и мониторинг-consumer не успевает их обрабатывать. Настройте алерт на lag > N в DLQ топике.
JMX метрики коннектора:
Connect публикует метрики через JMX:
kafka.connect:type=connector-task-metrics,connector={name},task={id}—record-error-rate,record-error-totalkafka.connect:type=task-error-metrics,connector={name},task={id}—total-errors-logged,total-record-failures
Prometheus/Grafana: Используйте JMX Exporter для экспорта метрик Connect в Prometheus. Алерт: record-error-rate > 0 в течение 5 минут.
Производственный паттерн: три уровня защиты
Полный production-паттерн обработки ошибок:
Уровень 1: Retry
errors.retry.timeout=300000 (5 минут)
errors.retry.delay.max.ms=60000
→ Временные сбои решаются сами
Уровень 2: DLQ
errors.tolerance=all
errors.deadletterqueue.topic.name=my-dlq
errors.deadletterqueue.context.headers.enable=true
→ Некорректные данные изолируются, не останавливают конвейер
Уровень 3: Алертинг
Алерт на consumer lag в DLQ > 0 в течение 15 минут
Алерт на record-error-rate через JMX/Prometheus
→ Команда знает о проблемах быстро
Ключевые выводы
- errors.tolerance=none (по умолчанию) — одна ошибка = остановка задачи.
- errors.tolerance=all — пропуск ошибочных записей, конвейер продолжает работать.
- DLQ (
errors.deadletterqueue.topic.name) — изоляция проблемных записей в отдельный топик для последующего анализа. - DLQ заголовки (
context.headers.enable=true) — полный контекст ошибки: топик, offset, этап, stack trace. - Retry (retry.timeout + retry.delay.max.ms) — временные сбои не попадают в DLQ.
- Мониторинг DLQ через consumer lag и JMX-метрики — обязателен в production.