Learning Platform
Глоссарий Troubleshooting
Урок 06.05 · 20 мин
Продвинутый
Error HandlingDead Letter QueueDLQerrors.toleranceerrors.deadletterqueue

Error Handling и Dead Letter Queue

В production-конвейере данные не всегда идеальны. Записи с некорректным форматом, нарушениями схемы, NULL в обязательных полях, переполнением числовых типов — всё это реальные сценарии. По умолчанию Kafka Connect останавливает задачу при первой же ошибке. Одна некорректная запись из миллиона может остановить весь конвейер.

Правильная стратегия обработки ошибок включает три уровня: толерантность к ошибкам (продолжать или остановиться), DLQ (куда складывать проблемные записи), мониторинг (как узнать, что проблемы есть).


Поведение по умолчанию: errors.tolerance=none

Без явной конфигурации Connect использует errors.tolerance=none. При любой ошибке обработки записи задача переходит в состояние FAILED.

Что считается ошибкой:

  • Ошибка десериализации (неверный формат JSON, несовместимая Avro-схема)
  • Ошибка трансформации в SMT (NullPointerException при попытке обратиться к отсутствующему полю)
  • Ошибка записи в целевую систему (JDBC constraint violation, Elasticsearch mapping conflict)

При errors.tolerance=none одна некорректная запись из миллиона блокирует весь конвейер. После перехода задачи в FAILED автоматического восстановления нет — нужно вручную исправить данные и перезапустить задачу через REST API (POST /connectors/{name}/tasks/{id}/restart).


errors.tolerance=all: пропуск ошибочных записей

errors.tolerance=all указывает Connect продолжать обработку при ошибках, пропуская проблемные записи.

{
  "errors.tolerance": "all"
}

При errors.tolerance=all:

  • Ошибочные записи пропускаются — конвейер не останавливается.
  • По умолчанию проблемные записи теряются (молчаливый сброс), если не настроен DLQ.
  • Ошибки логируются в log-файл воркера (уровень WARN).

Без DLQ это режим «пропусти и забудь»: данные теряются без возможности анализа причины.


Dead Letter Queue: сохранение проблемных записей

DLQ (Dead Letter Queue) — отдельный Kafka-топик, в который записываются проблемные записи. Это позволяет:

  • Не терять данные при ошибках обработки.
  • Анализировать причины ошибок в отдельном потоке.
  • Повторно обработать записи после исправления логики.
{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "orders-connector-dlq",
  "errors.deadletterqueue.context.headers.enable": "true"
}
Connect Error Handling с DLQ

Kafka Topic

Kafka Topic: исходный топик. Sink task читает все записи, включая потенциально некорректные.
consume

Sink Task

Sink Task: пытается десериализовать, применить SMT, записать в целевую систему. При ошибке — обращается к error handler.
при успехе
при ошибке

Целевая система

Целевая система: успешно обработанные записи записываются сюда. Offset коммитится только для успешных записей.

DLQ Topic

DLQ Topic: ошибочные записи с заголовками контекста. Отдельный consumer может читать DLQ для анализа, ретрая, оповещений. Retention DLQ должен быть достаточным для реакции команды.

DLQ заголовки: контекст ошибки

При errors.deadletterqueue.context.headers.enable=true каждая запись в DLQ содержит заголовки с контекстом ошибки:

ЗаголовокСодержимое
__connect.errors.topicИсходный топик
__connect.errors.partitionНомер партиции
__connect.errors.offsetOffset исходной записи
__connect.errors.connector.nameИмя коннектора
__connect.errors.task.idID задачи
__connect.errors.stageЭтап, на котором произошла ошибка (converter, transform, task-put)
__connect.errors.class.nameКласс исключения
__connect.errors.exception.messageСообщение исключения
__connect.errors.exception.stacktraceПолный stack trace

Эти заголовки критически важны для диагностики: они позволяют определить, в каком топике, партиции и offset была проблемная запись, на каком этапе возникла ошибка, и что именно пошло не так.


Логирование ошибок

Дополнительно к DLQ можно включить логирование ошибок в log-файл воркера:

{
  "errors.log.enable": "true",
  "errors.log.include.messages": "true"
}
  • errors.log.enable=true — записывает информацию об ошибке в log.
  • errors.log.include.messages=true — включает в лог содержимое проблемной записи (ключ и значение). Осторожно: если запись содержит чувствительные данные — они попадут в лог.

Retry: повторные попытки перед DLQ

Некоторые ошибки временные: база данных недоступна на несколько секунд, Elasticsearch перегружен. Для таких случаев есть retry-механизм:

{
  "errors.retry.delay.max.ms": "60000",
  "errors.retry.timeout": "300000"
}
  • errors.retry.delay.max.ms — максимальная задержка между повторными попытками (экспоненциальный backoff до этого значения).
  • errors.retry.timeout — общее время, в течение которого Connect пробует повторно. После истечения — запись уходит в DLQ (если настроен) или пропускается.

Retry применяется на уровне задачи, не всего коннектора. При исчерпании retry задача не переходит в FAILED при errors.tolerance=all — ошибочная запись просто уходит в DLQ.


Полная production-конфигурация

Рекомендуемая production-конфигурация обработки ошибок:

{
  "name": "orders-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "orders",

    "errors.tolerance": "all",

    "errors.deadletterqueue.topic.name": "orders-sink-dlq",
    "errors.deadletterqueue.context.headers.enable": "true",

    "errors.retry.delay.max.ms": "60000",
    "errors.retry.timeout": "300000",

    "errors.log.enable": "true",
    "errors.log.include.messages": "false"
  }
}
TIP

DLQ топик должен иметь retention.ms достаточным для реакции вашей команды. Рекомендация: 30 дней (2592000000 мс). Если команда обнаружит проблему через неделю — записи должны быть доступны для анализа и ретрая.


Мониторинг DLQ

DLQ — это обычный Kafka-топик. Мониторинг строится на стандартных инструментах:

Consumer lag на DLQ:

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group dlq-monitor-group

Если lag растёт — значит в DLQ поступают новые записи и мониторинг-consumer не успевает их обрабатывать. Настройте алерт на lag > N в DLQ топике.

JMX метрики коннектора:

Connect публикует метрики через JMX:

  • kafka.connect:type=connector-task-metrics,connector={name},task={id}record-error-rate, record-error-total
  • kafka.connect:type=task-error-metrics,connector={name},task={id}total-errors-logged, total-record-failures

Prometheus/Grafana: Используйте JMX Exporter для экспорта метрик Connect в Prometheus. Алерт: record-error-rate > 0 в течение 5 минут.


Производственный паттерн: три уровня защиты

Полный production-паттерн обработки ошибок:

Уровень 1: Retry
  errors.retry.timeout=300000 (5 минут)
  errors.retry.delay.max.ms=60000
  → Временные сбои решаются сами

Уровень 2: DLQ
  errors.tolerance=all
  errors.deadletterqueue.topic.name=my-dlq
  errors.deadletterqueue.context.headers.enable=true
  → Некорректные данные изолируются, не останавливают конвейер

Уровень 3: Алертинг
  Алерт на consumer lag в DLQ > 0 в течение 15 минут
  Алерт на record-error-rate через JMX/Prometheus
  → Команда знает о проблемах быстро

Ключевые выводы

  1. errors.tolerance=none (по умолчанию) — одна ошибка = остановка задачи.
  2. errors.tolerance=all — пропуск ошибочных записей, конвейер продолжает работать.
  3. DLQ (errors.deadletterqueue.topic.name) — изоляция проблемных записей в отдельный топик для последующего анализа.
  4. DLQ заголовки (context.headers.enable=true) — полный контекст ошибки: топик, offset, этап, stack trace.
  5. Retry (retry.timeout + retry.delay.max.ms) — временные сбои не попадают в DLQ.
  6. Мониторинг DLQ через consumer lag и JMX-метрики — обязателен в production.
Проверка знанийKnowledge check
Sink connector настроен с errors.tolerance=all и DLQ. За последние 2 часа в DLQ накопилось 15000 записей. Команда хочет понять причину и повторно обработать записи. Какие шаги позволят это сделать?
ОтветAnswer
(1) Диагностика: прочитать записи из DLQ и проверить заголовки __connect.errors.exception.message и __connect.errors.stage — это покажет причину (ошибка десериализации? SMT? insert в БД?) и этап. Заголовок __connect.errors.offset укажет на оригинальные offset в исходном топике. (2) Анализ причины: если это ошибка схемы — исправить конвертер или схему; если constraint violation в БД — проверить данные; если mapping conflict в ES — обновить mapping. (3) Ретрай: написать отдельный consumer, читающий из DLQ, применяющий исправленную логику и публикующий записи в исходный топик или напрямую в целевую систему. Либо исправить коннектор и вручную реплеить из DLQ. Для этого важно иметь retention.ms в DLQ достаточным (рекомендуется 30 дней).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Connect Sink Connector настроен с errors.tolerance=none. Из 10 000 записей в топике одна запись на offset 5000 имеет некорректный JSON. Что произойдёт при обработке этой записи?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7