Log Compaction
Стандартная политика retention в Kafka удаляет данные по времени или объёму. Но что если вам нужно хранить не историю всех событий, а только последнее известное состояние по каждому ключу? Именно это обеспечивает log compaction — механизм дедупликации по ключу.
Когда нужен compaction
Log compaction предназначен для топиков, используемых как changelog — журнал изменений состояния. Примеры:
- User profiles: каждое обновление профиля пользователя публикуется с ключом
user_id. Важно только последнее состояние — история промежуточных изменений не нужна потребителям, которые строят кэш. - Account balances: текущий баланс по
account_id. Каждая транзакция обновляет баланс, но downstream-система нуждается только в актуальном значении. - Device shadow: IoT-устройство периодически публикует своё состояние. Важно последнее известное состояние.
- Kafka Streams state stores: Kafka Streams сохраняет changelog state stores в compacted топиках для восстановления после рестарта.
Без compaction такой топик будет расти бесконечно — каждое обновление добавляет запись, и старые версии накапливаются. С compaction гарантируется, что для каждого ключа хранится хотя бы одна запись с последним значением.
Как работает compaction: head и tail
Kafka разделяет лог на две части:
Tail — скомпактированная часть. Здесь каждый ключ присутствует не более одного раза. Cleaner thread уже обработал эту область, оставив только последнюю на момент compaction запись для каждого ключа.
Head — неуплотнённая часть. Это недавние записи, которые ещё не прошли через compaction. Здесь один и тот же ключ может встречаться несколько раз.
Процесс compaction
Cleaner thread (log.cleaner.threads, по умолчанию 1) выполняет следующую работу:
- Сканирует head лога, строит в памяти хэш-карту
{key → offset}— для каждого ключа запоминает офсет последней записи - Сканирует tail: записи, для которых существует более новая версия в head, помечаются как удалённые
- Создаёт новые сегменты без удалённых записей
- Atomically заменяет старые сегменты новыми
Compaction не изменяет офсеты оставшихся записей. Если запись имела offset=100 и прошла через compaction, она сохраняет offset=100. Это важно: consumers, работающие с compacted топиком, могут видеть «дыры» в офсетах (например, 10, 50, 100), потому что промежуточные записи были удалены.
До и после compaction
min.cleanable.dirty.ratio
Параметр min.cleanable.dirty.ratio (по умолчанию 0.5) определяет, при каком соотношении «грязных» (head) данных к общему объёму лога запускается compaction.
dirty ratio = bytes_in_head / (bytes_in_head + bytes_in_tail)
При значении 0.5 compaction запускается, когда не менее 50% лога — это неуплотнённые данные. Меньшее значение (например, 0.2) означает более агрессивное и частое компактирование — лог чище, но потребляет больше ресурсов cleaner thread.
min.cleanable.dirty.ratio | Поведение |
|---|---|
| 0.1 | Очень агрессивное компактирование. Лог всегда чистый, но CPU нагрузка высокая. |
| 0.5 | Умеренное. Компактор запускается, когда половина лога — новые неуплотнённые данные. |
| 0.9 | Редкое компактирование. Экономит CPU, но лог может быть менее уплотнённым. |
Tombstone: удаление ключа
Как удалить ключ из compacted топика? Обычные записи только обновляют значение — они никогда не удаляют ключ полностью. Для удаления используются tombstones (надгробные камни).
Tombstone — это запись с заданным ключом и value=null. После того как cleaner thread видит tombstone, ключ помечается для полного удаления.
producer.send(topic="user-profiles", key="user-42", value=None)
Tombstone удаляется не мгновенно. После compaction tombstone-запись сохраняется в течение delete.retention.ms (по умолчанию 86400000 мс = 24 часа). Это время необходимо, чтобы consumer группы успели прочитать tombstone и обновить свои состояния. Если потребитель обработает логовый сегмент только после истечения delete.retention.ms, он никогда не узнает об удалении ключа — данные в его кэше останутся устаревшими.
После истечения delete.retention.ms tombstone полностью удаляется. Ключ больше не появляется в логе — потребитель, читающий с начала, не найдёт никаких записей для этого ключа.
max.compaction.lag.ms
Параметр max.compaction.lag.ms гарантирует, что запись в head лога не будет ждать compaction дольше заданного времени. Это временной верхний предел для cleaner thread.
Без этого параметра в случае низкого трафика (dirty ratio никогда не достигает min.cleanable.dirty.ratio) compaction может не запускаться долгое время.
Конфигурация compacted топика
# Создание compacted топика
kafka-topics.sh --bootstrap-server localhost:9092 \
--create \
--topic user-profiles \
--partitions 3 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.3 \
--config delete.retention.ms=3600000 \
--config max.compaction.lag.ms=3600000
Для топика с одновременной retention по времени и compaction используется cleanup.policy=compact,delete — сначала удаляются записи старше retention, затем compaction убирает дубли.