Learning Platform
Глоссарий Troubleshooting
Урок 04.04 · 25 мин
Продвинутый
RebalancingKIP-848CooperativeEagerConsumerGroupHeartbeat

Протоколы ребалансировки

Ребалансировка — это процесс перераспределения партиций между consumer группы при изменении её состава. Kafka прошла через три поколения протоколов ребалансировки: от простого, но дорогостоящего «стоп-мир» подхода до современного асинхронного сервер-driven протокола KIP-848, ставшего стандартом в Kafka 4.0.


Когда происходит ребалансировка

Ребалансировка инициируется при следующих событиях:

Триггеры ребалансировки

Новый consumer присоединяется

Consumer join: новый consumer присоединяется к группе. Group coordinator получает JoinGroup (legacy) или ConsumerGroupHeartbeat (KIP-848). Нужно перераспределить партиции с учётом нового участника.

Consumer уходит или падает

Consumer leave/crash: consumer вызвал close() — отправил LeaveGroup и группа ребалансируется немедленно. Или сработал session.timeout.ms — consumer перестал отправлять heartbeat, coordinator счёл его мёртвым.

Изменение топиков подписки

Изменение подписки: consumer вызвал subscribe() с новым списком топиков. Нужно переназначить партиции с учётом новых или удалённых топиков.

Изменение числа партиций топика

Изменение числа партиций: администратор добавил партиции в топик (kafka-topics.sh --alter). Новые партиции нужно назначить consumer группы.

Исторический контекст: Legacy Eager Protocol (JoinGroup/SyncGroup)

До Kafka 4.0 стандартным протоколом была eager (жадная) ребалансировка. В ней все consumer одновременно прекращали обработку, освобождали партиции, и ждали нового назначения.

NOTE

Этот протокол использовался до Kafka 4.0. В современных версиях он доступен как fallback через group.protocol=classic. Знание его работы полезно для понимания, что именно изменил KIP-848.

Как работала eager ребалансировка:

  1. Stop-the-world: все consumer в группе отзывают все назначенные партиции одновременно — сразу все сообщения перестают обрабатываться.
  2. JoinGroup: все consumer отправляют JoinGroup запрос group coordinator. Coordinator ждёт ответа от каждого.
  3. Синхронизационный барьер: ни один consumer не получает новое назначение, пока все не отправили JoinGroup.
  4. SyncGroup: coordinator выбирает group leader (первый ответивший consumer), тот вычисляет новое назначение и отправляет его всем через SyncGroup ответ.
  5. Возобновление: consumer начинают читать с новых партиций.

Проблема: при группе из 100 consumer добавление одного нового consumer останавливало все 100 consumer на время ребалансировки (иногда 30+ секунд в больших группах). В Kafka 4.0 это group.protocol=classic — доступно как fallback, но не рекомендуется.


Cooperative Incremental Protocol

Промежуточное решение: cooperative ребалансировка выполняется в два этапа, затрагивая только партиции, которые действительно нужно переместить.

Cooperative Incremental: двухфазный подход

Фаза 1: Revoke только затронутых партиций

Фаза 1 — Revoke: coordinator определяет, какие партиции нужно переместить. ТОЛЬКО их владельцы отзывают партиции. Остальные consumer продолжают работу без прерывания.
продолжают работу

Фаза 2: Assign новому consumer

Фаза 2 — Assign: отозванные партиции назначаются новому consumer. Остальные consumer всё время работали — нет глобального стоп-мира.

Лучше eager, но всё ещё требует двух round trip к coordinator. Включается через partition.assignment.strategy=cooperative-sticky (legacy protocol). В Kafka 4.0 вытеснен KIP-848.


KIP-848: Next-Gen Protocol (стандарт Kafka 4.0)

KIP-848 — это фундаментальная переработка архитектуры ребалансировки. Вместо того чтобы consumer синхронизировались между собой через coordinator, сервер сам управляет назначением через новый ConsumerGroupHeartbeat API.

KIP-848 vs Legacy: сравнение протоколов

Legacy Protocol (group.protocol=classic)

Legacy (JoinGroup/SyncGroup): все consumer останавливаются → ждут JoinGroup от всех → group leader вычисляет назначение → SyncGroup → все возобновляют работу. Глобальный барьер синхронизации.
  1. Consumer leaves
Шаг 1: Consumer 1 уходит. Coordinator ждёт LeaveGroup. Инициирует JoinGroup для всех оставшихся consumer.
  1. ALL consumers send JoinGroup (stop!)
Шаг 2: ВСЕ consumer отправляют JoinGroup. Производительность группы — 0. Весь трафик остановлен.
  1. SyncGroup response (30-60s in large groups)
Шаг 3: SyncGroup от group leader. Новое назначение разослано. В больших группах (100+ consumer) этот процесс занимает 30-60 секунд.

KIP-848 (group.protocol=consumer)

KIP-848 (ConsumerGroupHeartbeat): consumer периодически отправляет heartbeat с желаемыми топиками. Сервер сам управляет назначением. Нет синхронизационного барьера.
  1. Consumer leaves
Шаг 1: Consumer 1 уходит. Сервер получает информацию через heartbeat timeout или LeaveGroup.
  1. Server computes new assignment (async)
Шаг 2: Сервер асинхронно вычисляет новое назначение. Остальные consumer продолжают работать без остановки.
  1. Targeted heartbeat response (milliseconds)
Шаг 3: Следующий heartbeat затронутого consumer несёт новое назначение. Только этот consumer применяет изменение. Ускорение до 20x в больших группах.

ConsumerGroupHeartbeat API

KIP-848 вводит новый API-эндпоинт на брокере: ConsumerGroupHeartbeat. Consumer периодически отправляет heartbeat, содержащий:

  • group.id — идентификатор группы
  • member.id — уникальный ID этого экземпляра
  • subscribed.topics — список топиков для подписки
  • current.assignment — текущие назначенные TopicPartition

Брокер отвечает с target.assignment — новым желаемым назначением. Consumer применяет его инкрементально: сначала отзывает лишние партиции, затем берёт новые — без глобальной синхронизации.

TIP

KIP-848 — это стандарт для Kafka 4.0+. Если вы видите учебные материалы, описывающие JoinGroup/SyncGroup как основной протокол — они устарели. В современном коде с Kafka 4.0 используйте group.protocol=consumer.


Как включить KIP-848

Настройка на стороне consumer:

from kafka import KafkaConsumer

# KIP-848: явная настройка для Kafka 4.0
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    group_id='payments-group',
    # Явное включение нового протокола (Kafka 4.0)
    # В Kafka 4.2 это значение по умолчанию
    # group_protocol='consumer',  # если поддерживается клиентом
    auto_offset_reset='earliest',
)

В server.properties брокера (Kafka 4.0):

# Включён по умолчанию на стороне сервера в Kafka 4.0
# Consumer должен явно opt-in через group.protocol=consumer
group.coordinator.rebalance.protocols=classic,consumer
NOTE

В Python клиенте kafka-python поддержка KIP-848 добавляется начиная с версии 3.0. Для полного использования нового протокола используйте официальный Confluent Python Client (confluent-kafka-python) или Java client, где поддержка KIP-848 полная. В учебных целях с kafka_sim.py логика ребалансировки абстрагирована — фокус на концепциях.


Производительность KIP-848

На практике выигрыш масштабируется с размером группы:

СценарийLegacy EagerKIP-848
10 consumer, 1 покидает~2 сек downtime~100 мс
100 consumer, 1 покидает~30 сек downtime~200 мс
1000 consumer, 1 покидает60+ сек downtime~500 мс

Источник улучшения: в legacy протоколе coordinator ждёт JoinGroup от всех N consumer, даже если изменение касается только 1. В KIP-848 только затронутые consumer получают новое назначение.

KIP-848: сервер-driven назначение

Consumer A: Heartbeat (P0, P1)

Consumer A: отправляет ConsumerGroupHeartbeat с subscribed.topics=['orders'] и current.assignment=[P0, P1]. Продолжает обрабатывать P0 и P1.

Consumer B: Heartbeat (P2, P3)

Consumer B: отправляет ConsumerGroupHeartbeat с subscribed.topics=['orders'] и current.assignment=[P2, P3]. Активно обрабатывает P2 и P3.

Consumer C: Heartbeat (new, empty)

Consumer C (новый): первый heartbeat. current.assignment=[] — нет назначения. Сервер должен выделить ему партиции.
сервер вычисляет

Broker: вычисляет новое назначение асинхронно

Group Coordinator (сервер): получает три heartbeat. Вычисляет новое назначение: A получает P0, B получает P2-P3, C получает P1. Только Consumer A и C получат изменённое назначение в следующем heartbeat response. Consumer B продолжает без изменений.

A: target=[P0] → revoke P1

Consumer A: получает target.assignment=[P0] в heartbeat response. Отзывает P1, продолжает P0. Никакой остановки всей группы.

B: target=[P2,P3] (no change)

Consumer B: heartbeat response без изменений. B продолжает P2 и P3 без прерывания.

C: target=[P1] → assign P1

Consumer C: получает target.assignment=[P1]. Начинает читать P1 с закоммиченного offset (или earliest/latest по конфигурации).

Итог

Три поколения протоколов ребалансировки: eager (JoinGroup/SyncGroup) — глобальная остановка группы; cooperative incremental — двухфазный подход с минимальным прерыванием; KIP-848 (ConsumerGroupHeartbeat) — сервер-driven, асинхронный, без синхронизационного барьера. KIP-848 — стандарт Kafka 4.0, включается через group.protocol=consumer, является дефолтом в Kafka 4.2.

Проверка знанийKnowledge check
Почему KIP-848 (next-gen протокол) значительно быстрее при ребалансировке большой consumer group по сравнению с legacy eager протоколом?
ОтветAnswer
Legacy eager протокол требует глобального синхронизационного барьера: coordinator ждёт JoinGroup от ВСЕХ N consumer группы, прежде чем выдать новое назначение. Это означает, что добавление одного consumer останавливает всю группу до завершения ребалансировки — время растёт с размером группы. KIP-848 устраняет этот барьер: сервер управляет назначением асинхронно. Только те consumer, чьи партиции изменились, получают новое назначение в следующем heartbeat response. Остальные consumer продолжают работать без прерывания. Отсутствие глобальной синхронизации даёт ускорение до 20x в больших группах.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Consumer group использует legacy eager protocol (group.protocol=classic). Группа состоит из 50 consumer с 100 партициями. Один consumer падает. Что происходит с производительностью группы в момент ребалансировки?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7