Протоколы ребалансировки
Ребалансировка — это процесс перераспределения партиций между consumer группы при изменении её состава. Kafka прошла через три поколения протоколов ребалансировки: от простого, но дорогостоящего «стоп-мир» подхода до современного асинхронного сервер-driven протокола KIP-848, ставшего стандартом в Kafka 4.0.
Когда происходит ребалансировка
Ребалансировка инициируется при следующих событиях:
Новый consumer присоединяется
Consumer join: новый consumer присоединяется к группе. Group coordinator получает JoinGroup (legacy) или ConsumerGroupHeartbeat (KIP-848). Нужно перераспределить партиции с учётом нового участника.Consumer уходит или падает
Consumer leave/crash: consumer вызвал close() — отправил LeaveGroup и группа ребалансируется немедленно. Или сработал session.timeout.ms — consumer перестал отправлять heartbeat, coordinator счёл его мёртвым.Изменение топиков подписки
Изменение подписки: consumer вызвал subscribe() с новым списком топиков. Нужно переназначить партиции с учётом новых или удалённых топиков.Изменение числа партиций топика
Изменение числа партиций: администратор добавил партиции в топик (kafka-topics.sh --alter). Новые партиции нужно назначить consumer группы.Исторический контекст: Legacy Eager Protocol (JoinGroup/SyncGroup)
До Kafka 4.0 стандартным протоколом была eager (жадная) ребалансировка. В ней все consumer одновременно прекращали обработку, освобождали партиции, и ждали нового назначения.
Этот протокол использовался до Kafka 4.0. В современных версиях он доступен как fallback через group.protocol=classic. Знание его работы полезно для понимания, что именно изменил KIP-848.
Как работала eager ребалансировка:
- Stop-the-world: все consumer в группе отзывают все назначенные партиции одновременно — сразу все сообщения перестают обрабатываться.
- JoinGroup: все consumer отправляют JoinGroup запрос group coordinator. Coordinator ждёт ответа от каждого.
- Синхронизационный барьер: ни один consumer не получает новое назначение, пока все не отправили JoinGroup.
- SyncGroup: coordinator выбирает group leader (первый ответивший consumer), тот вычисляет новое назначение и отправляет его всем через SyncGroup ответ.
- Возобновление: consumer начинают читать с новых партиций.
Проблема: при группе из 100 consumer добавление одного нового consumer останавливало все 100 consumer на время ребалансировки (иногда 30+ секунд в больших группах). В Kafka 4.0 это group.protocol=classic — доступно как fallback, но не рекомендуется.
Cooperative Incremental Protocol
Промежуточное решение: cooperative ребалансировка выполняется в два этапа, затрагивая только партиции, которые действительно нужно переместить.
Фаза 1: Revoke только затронутых партиций
Фаза 1 — Revoke: coordinator определяет, какие партиции нужно переместить. ТОЛЬКО их владельцы отзывают партиции. Остальные consumer продолжают работу без прерывания.Фаза 2: Assign новому consumer
Фаза 2 — Assign: отозванные партиции назначаются новому consumer. Остальные consumer всё время работали — нет глобального стоп-мира.Лучше eager, но всё ещё требует двух round trip к coordinator. Включается через partition.assignment.strategy=cooperative-sticky (legacy protocol). В Kafka 4.0 вытеснен KIP-848.
KIP-848: Next-Gen Protocol (стандарт Kafka 4.0)
KIP-848 — это фундаментальная переработка архитектуры ребалансировки. Вместо того чтобы consumer синхронизировались между собой через coordinator, сервер сам управляет назначением через новый ConsumerGroupHeartbeat API.
Legacy Protocol (group.protocol=classic)
Legacy (JoinGroup/SyncGroup): все consumer останавливаются → ждут JoinGroup от всех → group leader вычисляет назначение → SyncGroup → все возобновляют работу. Глобальный барьер синхронизации.- Consumer leaves
- ALL consumers send JoinGroup (stop!)
- SyncGroup response (30-60s in large groups)
KIP-848 (group.protocol=consumer)
KIP-848 (ConsumerGroupHeartbeat): consumer периодически отправляет heartbeat с желаемыми топиками. Сервер сам управляет назначением. Нет синхронизационного барьера.- Consumer leaves
- Server computes new assignment (async)
- Targeted heartbeat response (milliseconds)
ConsumerGroupHeartbeat API
KIP-848 вводит новый API-эндпоинт на брокере: ConsumerGroupHeartbeat. Consumer периодически отправляет heartbeat, содержащий:
group.id— идентификатор группыmember.id— уникальный ID этого экземпляраsubscribed.topics— список топиков для подпискиcurrent.assignment— текущие назначенные TopicPartition
Брокер отвечает с target.assignment — новым желаемым назначением. Consumer применяет его инкрементально: сначала отзывает лишние партиции, затем берёт новые — без глобальной синхронизации.
KIP-848 — это стандарт для Kafka 4.0+. Если вы видите учебные материалы, описывающие JoinGroup/SyncGroup как основной протокол — они устарели. В современном коде с Kafka 4.0 используйте group.protocol=consumer.
Как включить KIP-848
Настройка на стороне consumer:
from kafka import KafkaConsumer
# KIP-848: явная настройка для Kafka 4.0
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id='payments-group',
# Явное включение нового протокола (Kafka 4.0)
# В Kafka 4.2 это значение по умолчанию
# group_protocol='consumer', # если поддерживается клиентом
auto_offset_reset='earliest',
)
В server.properties брокера (Kafka 4.0):
# Включён по умолчанию на стороне сервера в Kafka 4.0
# Consumer должен явно opt-in через group.protocol=consumer
group.coordinator.rebalance.protocols=classic,consumer
В Python клиенте kafka-python поддержка KIP-848 добавляется начиная с версии 3.0. Для полного использования нового протокола используйте официальный Confluent Python Client (confluent-kafka-python) или Java client, где поддержка KIP-848 полная. В учебных целях с kafka_sim.py логика ребалансировки абстрагирована — фокус на концепциях.
Производительность KIP-848
На практике выигрыш масштабируется с размером группы:
| Сценарий | Legacy Eager | KIP-848 |
|---|---|---|
| 10 consumer, 1 покидает | ~2 сек downtime | ~100 мс |
| 100 consumer, 1 покидает | ~30 сек downtime | ~200 мс |
| 1000 consumer, 1 покидает | 60+ сек downtime | ~500 мс |
Источник улучшения: в legacy протоколе coordinator ждёт JoinGroup от всех N consumer, даже если изменение касается только 1. В KIP-848 только затронутые consumer получают новое назначение.
Consumer A: Heartbeat (P0, P1)
Consumer A: отправляет ConsumerGroupHeartbeat с subscribed.topics=['orders'] и current.assignment=[P0, P1]. Продолжает обрабатывать P0 и P1.Consumer B: Heartbeat (P2, P3)
Consumer B: отправляет ConsumerGroupHeartbeat с subscribed.topics=['orders'] и current.assignment=[P2, P3]. Активно обрабатывает P2 и P3.Consumer C: Heartbeat (new, empty)
Consumer C (новый): первый heartbeat. current.assignment=[] — нет назначения. Сервер должен выделить ему партиции.Broker: вычисляет новое назначение асинхронно
Group Coordinator (сервер): получает три heartbeat. Вычисляет новое назначение: A получает P0, B получает P2-P3, C получает P1. Только Consumer A и C получат изменённое назначение в следующем heartbeat response. Consumer B продолжает без изменений.A: target=[P0] → revoke P1
Consumer A: получает target.assignment=[P0] в heartbeat response. Отзывает P1, продолжает P0. Никакой остановки всей группы.B: target=[P2,P3] (no change)
Consumer B: heartbeat response без изменений. B продолжает P2 и P3 без прерывания.C: target=[P1] → assign P1
Consumer C: получает target.assignment=[P1]. Начинает читать P1 с закоммиченного offset (или earliest/latest по конфигурации).Итог
Три поколения протоколов ребалансировки: eager (JoinGroup/SyncGroup) — глобальная остановка группы; cooperative incremental — двухфазный подход с минимальным прерыванием; KIP-848 (ConsumerGroupHeartbeat) — сервер-driven, асинхронный, без синхронизационного барьера. KIP-848 — стандарт Kafka 4.0, включается через group.protocol=consumer, является дефолтом в Kafka 4.2.