Справочник ключевых терминов курса Apache Kafka.
Фундаментальная абстракция Apache Kafka: упорядоченная, неизменяемая последовательность записей, реплицированная по нескольким брокерам. Каждая запись добавляется в конец лога и сохраняет своё смещение навсегда. Именно эта модель отличает Kafka от традиционных очередей сообщений: потребители читают лог независимо, не удаляя записи.
# Kafka как commit log:
# Запись добавляется в конец, offset монотонно растёт
# offset: 0 1 2 3 4 5 6
# data: A B C D E F G
# ^--- Log End Offset (LEO)
# Consumer group A читает с offset 3
# Consumer group B читает с offset 6
# Записи не удаляются после чтенияОтдельный процесс сервера Kafka, хранящий данные и обслуживающий запросы продюсеров и потребителей. Каждый брокер идентифицируется уникальным числом broker.id и обслуживает набор партиций. Брокеры образуют кластер, обмениваясь метаданными через протокол KRaft или (исторически) через ZooKeeper.
# server.properties — базовая конфигурация брокера
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-1.example.com:9092
log.dirs=/var/kafka/data
num.partitions=3
default.replication.factor=3
min.insync.replicas=2Именованный логический канал для организации потока событий в Kafka. Топик разбивается на партиции, каждая из которых является независимым упорядоченным логом. Топик не привязан к конкретному брокеру: его партиции могут быть распределены по всему кластеру. Retention-политика определяет, как долго хранятся записи.
# Создание топика через CLI
kafka-topics.sh --bootstrap-server localhost:9092 \
--create \
--topic orders \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=deleteБазовая единица параллелизма и хранения в Kafka. Партиция представляет собой упорядоченный, только-для-добавления лог записей. Каждая партиция хранится на одном брокере-лидере и реплицируется на несколько брокеров-фолловеров. Количество партиций определяет максимальный параллелизм чтения в потребительской группе.
# Топик 'orders' с 3 партициями на кластере из 3 брокеров:
# Partition 0: Leader=broker-1, Replicas=[1,2,3], ISR=[1,2,3]
# Partition 1: Leader=broker-2, Replicas=[2,3,1], ISR=[2,3,1]
# Partition 2: Leader=broker-3, Replicas=[3,1,2], ISR=[3,1,2]
kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092Уникальный монотонно возрастающий целочисленный идентификатор записи внутри конкретной партиции. Смещение позволяет потребителям точно отслеживать позицию чтения: в случае перезапуска потребитель возобновляет чтение с последнего зафиксированного смещения. Смещения хранятся во внутреннем топике __consumer_offsets.
# Фиксация смещения вручную в Java
consumer.poll(Duration.ofMillis(100)).forEach(record -> {
processRecord(record);
// Фиксация после обработки каждой записи
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
});Физический файл на диске, в котором хранится часть записей партиции. Каждая партиция состоит из одного активного и нескольких закрытых сегментов. Kafka создаёт новый сегмент при достижении порога log.segment.bytes или log.roll.ms. К каждому сегменту прилагаются индексные файлы (.index, .timeindex) для быстрого поиска по смещению и времени.
# Структура файлов партиции на диске
/var/kafka/data/orders-0/
00000000000000000000.log # сегмент от offset 0
00000000000000000000.index # индекс смещений
00000000000000000000.timeindex # индекс по времени
00000000000001048576.log # следующий сегмент
00000000000001048576.index
00000000000001048576.timeindexСовокупность брокеров Kafka, совместно обслуживающих топики и обеспечивающих высокую доступность. В режиме KRaft часть брокеров выполняет роль контроллеров, формируя кворум метаданных. Клиенты подключаются к любому брокеру через bootstrap-список и автоматически получают актуальные метаданные обо всём кластере.
# Подключение клиента к кластеру
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
# Клиент запрашивает метаданные у первого доступного брокера
# и узнаёт адреса всех брокеров и лидеров партиций
# Последующие запросы идут напрямую к нужному брокеруМножество реплик партиции, которые отстают от лидера не более чем на replica.lag.time.max.ms миллисекунд. Только записи, подтверждённые всеми репликами из ISR, считаются зафиксированными (committed). Если реплика выходит из ISR, лидер может продолжить работу; если ISR сжимается ниже min.insync.replicas, лидер начинает отклонять записи продюсеров с acks=all.
# Мониторинг ISR через CLI
kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
# Topic: orders Partition: 0 Leader: 1 Replicas: 1,2,3 ISR: 1,2 Offline: 3
# ^ broker-3 выпал из ISR — под-реплицированная партиция!
# JMX метрика: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitionsМаксимальное смещение в партиции, которое подтверждено всеми репликами из ISR. Потребители не могут прочитать записи выше High Watermark: это гарантирует, что они видят только зафиксированные данные, которые не будут потеряны при сбое лидера. Лидер периодически сообщает фолловерам актуальное значение HW через ответы на Fetch-запросы.
# Соотношение LEO и HW:
# Leader LEO = 10, Follower-1 LEO = 10, Follower-2 LEO = 8
# ISR = {Leader, Follower-1, Follower-2}
# HW = min(LEO всех в ISR) = 8
# Потребитель прочитает только offset 0..7
# После репликации Follower-2: HW продвинется до 10
# JMX: kafka.log:type=Log,name=LogEndOffset,topic=orders,partition=0Смещение следующей записи, которая будет добавлена в лог партиции на данной реплике. LEO на лидере всегда больше или равен LEO любого фолловера. Разница между LEO лидера и LEO фолловера определяет отставание реплики. Когда все реплики из ISR достигают одного LEO, High Watermark продвигается до этого значения.
# Состояние реплик партиции
# Запись смещений на каждой реплике:
# Leader: LEO=15 (принял все записи)
# Follower-1: LEO=15 (в ISR, в синхронизации)
# Follower-2: LEO=12 (отстаёт, ещё в ISR)
# Follower-3: LEO=5 (выбыл из ISR)
# HW = 12 (min LEO среди ISR)
# Лаг Follower-2: 15 - 12 = 3 записиМонотонно возрастающий счётчик, увеличивающийся при каждой смене лидера партиции. Epoch служит fencing-токеном: новый лидер использует epoch для обнаружения и отклонения устаревших запросов от бывшего лидера. Это предотвращает сценарий split-brain, когда два брокера одновременно считают себя лидером и могут записывать конфликтующие данные.
# Формат ответа на Fetch-запрос фолловера
# LeaderEpoch передаётся с каждым ответом
# Фолловер проверяет: если epoch изменился, нужно усечь лог
# Файл leader-epoch-checkpoint хранит историю
# /var/kafka/data/orders-0/leader-epoch-checkpoint
# 0
# 3 (epoch=3 начался с offset=0)
# 5 (epoch=5 начался с offset=200)
# Epoch 5 — текущий лидерВстроенный протокол консенсуса Kafka (с версии 2.8, обязателен с 4.0), заменивший ZooKeeper. Метаданные кластера хранятся во внутреннем топике __cluster_metadata с фактором репликации, равным количеству контроллеров. Алгоритм Raft обеспечивает консенсус между контроллерами без внешних зависимостей, упрощая развёртывание и снижая задержку операций с метаданными.
# KRaft-конфигурация для комбинированного узла
# (брокер + контроллер)
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
controller.listener.names=CONTROLLER
# Инициализация кластера
kafka-storage.sh format -t <cluster-id> -c server.propertiesГруппа KRaft-контроллеров, которые через протокол Raft управляют метаданными кластера: списком брокеров, конфигурациями топиков, назначением лидеров партиций. Один контроллер является активным (active controller), остальные — резервными (standby). Топик __cluster_metadata хранит все изменения в виде event log.
# Проверка статуса кворума контроллеров
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
# ClusterId: abc123xyz
# LeaderId: 1
# LeaderEpoch: 5
# HighWatermark: 1248
# MaxFollowerLag: 0
# MaxFollowerLagTimeMs: 120
# CurrentVoters: [1,2,3]
# CurrentObservers: []Выбор нового лидера партиции из реплик, не входящих в ISR, при отсутствии доступных синхронизированных реплик. Включается параметром unclean.leader.election.enable=true. Обеспечивает доступность ценой возможной потери данных: новый лидер не имеет всех зафиксированных записей и перезапишет часть лога. По умолчанию отключён для систем, требующих гарантий сохранности данных.
# Параметр топика — включить только при критической необходимости
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name orders \
--alter \
--add-config unclean.leader.election.enable=true
# Риск: если было 5 незафиксированных записей,
# после выборов нечистого лидера они будут потеряны
# и потребитель увидит дыру или перечитает старые данныеБуфер в памяти продюсера, группирующий записи в батчи перед отправкой на брокер. Для каждой пары (топик, партиция) поддерживается своя очередь ProducerBatch. Запись считается готовой к отправке, когда батч достиг batch.size байт или истёк linger.ms. Record Accumulator — главный механизм повышения пропускной способности продюсера.
# Настройка аккумулятора для высокой пропускной способности
Properties props = new Properties();
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64 KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // ждать до 10 мс
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32 MB общий буфер
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // сжатие батча
// При linger.ms=10 продюсер агрегирует записи 10 мс,
// затем отправляет один большой батч вместо многих мелкихРежим работы продюсера, гарантирующий доставку каждой записи ровно один раз (exactly once) в одну партицию. Брокер присваивает продюсеру уникальный Producer ID (PID) и отслеживает порядковый номер (sequence number) каждой записи. Дублирующиеся записи с одинаковым PID и sequence отбрасываются. Включается параметром enable.idempotence=true (по умолчанию с Kafka 3.0).
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Автоматически устанавливает:
// acks=all, retries=Integer.MAX_VALUE, max.in.flight=5
// Брокер хранит: {PID=42, partition=0, lastSeq=100}
// Запись с seq=100 при повторе будет отброшена
// Запись с seq=101 будет принята как новаяМеханизм атомарной записи в несколько партиций и топиков в рамках одной транзакции. Используется для реализации exactly-once semantics (EOS) в сценариях consume-transform-produce. Продюсер с transactional.id взаимодействует с transaction coordinator — специальным брокером, хранящим состояние транзакции в топике __transaction_state.
producer.initTransactions();
try {
producer.beginTransaction();
// Запись в несколько топиков атомарна
producer.send(new ProducerRecord<>("orders", key, value));
producer.send(new ProducerRecord<>("audit-log", key, audit));
// Фиксация смещений потребителя в рамках транзакции
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}Параметр продюсера, определяющий, сколько реплик должны подтвердить получение записи, прежде чем брокер ответит продюсеру об успехе. Три значения: acks=0 — не ждать подтверждения (максимальная скорость, возможна потеря); acks=1 — ждать только лидера (риск потери при немедленном падении); acks=all (или -1) — ждать всех реплик из ISR (максимальная надёжность).
# acks=all + min.insync.replicas=2 — золотой стандарт
# server.properties (глобально)
default.replication.factor=3
min.insync.replicas=2
# producer.properties
acks=all
# Семантика: запись подтверждена, если 2 из 3 реплик
# записали её на диск. При падении одного брокера
# данные не потеряны — у двух других есть копияСтратегия выбора партиции для записи на основе ключа сообщения. DefaultPartitioner применяет алгоритм murmur2 к ключу и вычисляет номер партиции: partition = murmur2(key) % numPartitions. Записи с одинаковым ключом гарантированно попадают в одну партицию, что обеспечивает упорядоченность для конкретного ключа. При null-ключе используется sticky partitioner.
// Запись с ключом — гарантия порядка для user_id
ProducerRecord<String, Order> record = new ProducerRecord<>(
"orders",
"user-42", // ключ — всегда та же партиция
new Order(orderId, items)
);
producer.send(record);
// Кастомный партиционер для равномерного распределения
public class RoundRobinPartitioner implements Partitioner {
public int partition(String topic, Object key,
byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return (int)(counter.getAndIncrement() % cluster.partitionCountForTopic(topic));
}
}Логическая группа потребителей с общим group.id, совместно читающих топик. Каждая партиция назначается ровно одному потребителю внутри группы, обеспечивая параллельную обработку без дублирования. Несколько групп могут независимо читать один топик — каждая получает полную копию данных. Текущая позиция каждой группы в каждой партиции хранится в топике __consumer_offsets.
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
// Все экземпляры с group.id="order-processor" делят партицииСпециальный брокер, ответственный за управление конкретной группой потребителей: отслеживание живых членов, инициирование ребалансировок и хранение зафиксированных смещений. Координатор определяется по hash(group.id) % numPartitions(__consumer_offsets). Каждый потребитель при старте регистрируется у своего координатора через запрос FindCoordinator.
# Определение брокера-координатора для группы
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group order-processor
# Вывод: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# Coordinator находится на брокере, ответственном за
# партицию hash("order-processor") % 50 топика __consumer_offsetsПроцесс перераспределения партиций между потребителями в группе при изменении её состава (присоединение, выход, сбой) или количества партиций. Во время ребалансировки все потребители группы приостанавливают обработку. Eager rebalancing отзывает все назначения и перераспределяет заново. Cooperative Sticky rebalancing (ICKP) перераспределяет только изменившиеся партиции, минимизируя простой.
# Настройка Cooperative Sticky Assignor (минимальный простой)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
# Настройка таймаутов сессии
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000); // 45 сек
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000); // 15 сек
# Потребитель должен отправлять heartbeat каждые 15 сек,
# иначе через 45 сек координатор инициирует ребалансировкуРежим потребителя с фиксированным идентификатором group.instance.id, позволяющий избежать ребалансировки при кратковременных перезапусках. Потребитель со статическим ID сохраняет своё назначение партиций в течение session.timeout.ms после отключения. При перезапуске он присоединяется к группе с теми же партициями без полной ребалансировки. Полезно для stateful-потребителей.
# Конфигурация статического членства
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-pod-3");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000); // 5 минут
# При рестарте пода consumer-pod-3 в течение 5 минут
# он снова получит те же партиции без ребалансировки
# Идеально для Kafka Streams и stateful обработчиковЗапись текущей позиции потребителя в партиции в топик __consumer_offsets. Автоматический коммит (enable.auto.commit=true) выполняется каждые auto.commit.interval.ms — прост, но может привести к дублированию при сбое. Ручной коммит (commitSync или commitAsync) даёт точный контроль над тем, когда запись считается обработанной. Зафиксированное смещение = следующая запись для чтения при перезапуске.
// Ручной коммит после гарантированной обработки
consumer.poll(Duration.ofMillis(100)).forEach(record -> {
try {
processRecord(record);
// Асинхронный коммит для производительности
consumer.commitAsync((offsets, exception) -> {
if (exception != null) log.error("Commit failed", exception);
});
} catch (Exception e) {
// Синхронный коммит при завершении для надёжности
consumer.commitSync();
throw e;
}
});Политика хранения, при которой Kafka сохраняет только последнюю запись для каждого уникального ключа. Cleaner-потоки брокера периодически объединяют сегменты лога, удаляя устаревшие версии ключей. Итоговый лог содержит полный снимок текущего состояния системы. Подходит для топиков с семантикой event-sourcing: топик изменений конфигурации, changelog Kafka Streams.
# Создание топика с уплотнением лога
kafka-topics.sh --bootstrap-server localhost:9092 \
--create \
--topic user-profiles \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.1 \
--config delete.retention.ms=86400000
# Запись с value=null (tombstone) удаляет ключ
producer.send(new ProducerRecord<>("user-profiles", "user-42", null));Функция (с Kafka 3.6+), позволяющая переносить старые сегменты лога в объектное хранилище (S3, GCS, Azure Blob) при сохранении горячих данных на локальном диске брокера. Брокеры хранят только локальный кеш последних сегментов, обеспечивая прозрачный доступ к архивным данным. Снижает затраты на хранение и позволяет хранить данные практически неограниченно.
# server.properties — включение tiered storage
remote.log.storage.system.enable=true
remote.log.manager.task.interval.ms=30000
# Плагин для S3 (конкретная реализация зависит от провайдера)
remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.RemoteLogStorageManager
# Конфигурация топика
kafka-configs.sh --alter --topic events \
--add-config remote.storage.enable=true,\
local.retention.ms=86400000Оптимизация передачи данных в ядре Linux, используемая Kafka при отправке данных потребителям. Вместо копирования данных из дискового кеша ОС в пространство пользователя и обратно, Kafka использует системный вызов sendfile(), который передаёт данные напрямую из кеша страниц в сетевой буфер. Это снижает нагрузку на CPU и увеличивает пропускную способность в разы.
# Путь данных без zero-copy (4 копирования):
# Диск -> OS Page Cache -> User Space Buffer -> Kernel Socket Buffer -> NIC
# Путь данных с zero-copy (2 копирования, sendfile):
# Диск -> OS Page Cache -> NIC
# В Java: FileChannel.transferTo() использует sendfile()
# Kafka автоматически применяет zero-copy для FileChannel
# ВАЖНО: работает только без шифрования SSL на уровне брокераБинарный протокол запросов/ответов для взаимодействия клиентов с брокерами и брокеров между собой. Каждый запрос имеет 2-байтный API Key (например, Produce=0, Fetch=1), версию API и correlation ID для сопоставления ответов. Kafka поддерживает версионирование API: новые клиенты могут работать со старыми брокерами и наоборот через согласование версии.
# Структура запроса Produce (упрощённо)
# [4 байта] Length
# [2 байта] API Key = 0 (Produce)
# [2 байта] API Version = 9
# [4 байта] Correlation ID
# [2 байта] Client ID length + данные
# [body] Topic name, acks, timeout, records
# Просмотр поддерживаемых API брокером
kafka-broker-api-versions.sh --bootstrap-server localhost:9092Фреймворк для масштабируемой и надёжной интеграции Kafka с внешними системами без написания кода. Source-коннекторы читают данные из внешних систем (базы данных, файловые системы) и публикуют в Kafka. Sink-коннекторы читают из Kafka и записывают во внешние системы (Elasticsearch, S3, PostgreSQL). Connect поддерживает горизонтальное масштабирование через воркеры.
# Конфигурация JDBC Source коннектора
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "postgres-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://localhost/shop",
"table.whitelist": "orders",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"topic.prefix": "pg."
}
}'Лёгкие преобразования, применяемые к каждой записи в конвейере Kafka Connect без написания кастомного коннектора. SMT работают непосредственно на воркере Connect и выполняются до/после коннектора. Примеры: ReplaceField (переименование полей), MaskField (маскирование данных), ExtractField (извлечение вложенного поля), InsertField (добавление метаданных). Для сложных преобразований рекомендуется Kafka Streams.
# Конфигурация SMT в коннекторе
# Маскировка PII + добавление топика как поля
transforms=maskPII,addTopic
transforms.maskPII.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.maskPII.fields=email,phone
transforms.addTopic.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addTopic.topic.field=_kafka_topic
# Для условного применения:
transforms.maskPII.predicate=isProduction
predicates.isProduction.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.isProduction.name=envТопик Kafka, в который Kafka Connect направляет записи, которые не удалось обработать. При ошибке десериализации, преобразования или записи во внешнюю систему запись помещается в DLQ вместо остановки коннектора. Заголовки записи содержат информацию об ошибке (тип исключения, трассировку стека, имя коннектора). DLQ позволяет команде исследовать проблемные записи без потери данных.
# Включение DLQ в конфигурации sink-коннектора
errors.tolerance=all
errors.deadletterqueue.topic.name=orders-dlq
errors.deadletterqueue.topic.replication.factor=3
errors.deadletterqueue.context.headers.enable=true
errors.log.enable=true
errors.log.include.messages=true
# Чтение DLQ для диагностики
kafka-console-consumer.sh \
--topic orders-dlq \
--property print.headers=true \
--bootstrap-server localhost:9092Централизованный сервис для управления схемами сообщений (Avro, Protobuf, JSON Schema). Каждой схеме присваивается числовой ID; продюсеры записывают в топик Magic Byte (0x0) + 4-байтный schema ID + payload. Потребители используют schema ID для получения схемы и десериализации. Schema Registry версионирует схемы в рамках subject и применяет правила совместимости при регистрации новой версии.
# Регистрация схемы Avro через REST API
curl -X POST http://schema-registry:8081/subjects/orders-value/versions \
-H 'Content-Type: application/vnd.schemaregistry.v1+json' \
-d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}
# Wire Format: [0x00][schema_id: 4 bytes][avro payload]Компактный бинарный формат сериализации с поддержкой эволюции схемы, наиболее часто используемый с Kafka и Schema Registry. Avro-запись не содержит имён полей — они определяются схемой, что делает payload минимальным. Поддерживает добавление/удаление необязательных полей с default-значениями без нарушения совместимости. Поддержка Schema Registry встроена в confluent-kafka-avro клиенты.
# Схема Avro (orders.avsc)
{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string", "default": "PENDING"}
]
}
# Эволюция: добавление поля с default не нарушает
# обратную совместимость (BACKWARD mode)Правила Schema Registry, определяющие, какие изменения схемы допустимы при регистрации новой версии. BACKWARD: новая версия может читать данные, записанные старой. FORWARD: старая версия может читать данные, записанные новой. FULL: совместима в обоих направлениях. BACKWARD_TRANSITIVE: совместима со всеми предыдущими версиями, а не только с предыдущей. По умолчанию используется BACKWARD.
# Установка режима совместимости для subject
curl -X PUT http://schema-registry:8081/config/orders-value \
-H 'Content-Type: application/vnd.schemaregistry.v1+json' \
-d '{"compatibility": "BACKWARD"}'
# Таблица допустимых изменений при BACKWARD:
# Добавить опциональное поле (с default) — РАЗРЕШЕНО
# Удалить поле — РАЗРЕШЕНО
# Добавить обязательное поле — ЗАПРЕЩЕНО
# Изменить тип поля — ЗАПРЕЩЕНОАбстракция Kafka Streams, представляющая бесконечный поток записей, где каждая запись интерпретируется как независимое событие. KStream поддерживает stateless-операции (filter, map, flatMap) и stateful-операции (aggregate, join, windowed operations). При материализации KStream создаёт новый топик с результатами. Все записи сохраняются, даже с одинаковым ключом.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders =
builder.stream("raw-orders");
// Stateless трансформация
KStream<String, EnrichedOrder> enriched = orders
.filter((key, order) -> order.getAmount() > 0)
.mapValues(order -> enrich(order));
// Запись в новый топик
enriched.to("enriched-orders",
Produced.with(Serdes.String(), enrichedSerde));Абстракция Kafka Streams, представляющая changelog-поток, материализованный как таблица: для каждого ключа хранится только последнее значение. Запись с null-значением воспринимается как удаление ключа (tombstone). KTable хранит состояние локально в RocksDB и реплицирует его через changelog-топик. Joins с KTable — это lookup по ключу в текущем снимке состояния.
StreamsBuilder builder = new StreamsBuilder();
// KTable из топика user-profiles
KTable<String, UserProfile> profiles =
builder.table("user-profiles",
Materialized.as("profiles-store"));
// Join KStream с KTable
KStream<String, EnrichedOrder> enriched = orders.join(
profiles,
(order, profile) -> new EnrichedOrder(order, profile.getName())
);
// Join мгновенный: lookup по ключу в локальном state storeВариант KTable, в котором каждый экземпляр приложения хранит полную копию таблицы, а не только партиции, назначенные ему. Позволяет выполнять join по произвольному атрибуту (не обязательно по ключу раздела). Полезен для небольших справочных таблиц (например, конфигурация, коды стран), которые нужно иметь на каждом узле. Недостаток: потребляет больше памяти и сети.
StreamsBuilder builder = new StreamsBuilder();
// GlobalKTable реплицируется на все экземпляры
GlobalKTable<String, Product> products =
builder.globalTable("product-catalog");
// Можно join по любому полю значения, не только по ключу
KStream<String, EnrichedOrder> enriched = orders.join(
products,
(orderKey, order) -> order.getProductId(), // извлечение ключа для join
(order, product) -> new EnrichedOrder(order, product)
);Локальное хранилище ключ-значение в Kafka Streams, поддерживающее stateful-операции: агрегации, джоины, windowing. По умолчанию реализован на базе RocksDB (persistent) или HashMap (in-memory). Каждый state store автоматически реплицируется через changelog-топик, что позволяет восстановить состояние при перезапуске или ребалансировке. Standby-реплики обеспечивают быстрое восстановление.
// Materialized store для агрегации
KTable<String, Long> orderCounts = orders
.groupByKey()
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>
as("order-count-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
);
// Запрос состояния через Interactive Queries
ReadOnlyKeyValueStore<String, Long> store =
streams.store(StoreQueryParameters.fromNameAndType(
"order-count-store", QueryableStoreTypes.keyValueStore()));Механизм группировки событий по временным окнам для вычисления агрегатов за определённый период. Kafka Streams поддерживает четыре типа окон: Tumbling (неперекрывающиеся окна фиксированного размера), Hopping (перекрывающиеся окна), Session (динамические окна по активности), Sliding (окна фиксированного размера, скользящие по событиям). Результаты хранятся в windowed state store.
// Подсчёт заказов за каждые 5 минут (Tumbling Window)
KTable<Windowed<String>, Long> windowedCounts = orders
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("windowed-count-store"));
// Ключ результата: Windowed{key="user-1", window=[12:00, 12:05)}
// Hopping: окно 5 мин, шаг 1 мин — записи считаются в нескольких окнах
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30))
.advanceBy(Duration.ofMinutes(1));SQL-движок для потоковой обработки на базе Kafka Streams, позволяющий писать потоковые запросы на SQL без написания Java-кода. Создаёт STREAM (аналог KStream) и TABLE (аналог KTable) поверх топиков Kafka. CSAS (CREATE STREAM AS SELECT) и CTAS (CREATE TABLE AS SELECT) запускают постоянные потоковые вычисления. Поддерживает push-запросы (подписка на поток изменений) и pull-запросы (точечный lookup по состоянию).
-- Создание стрима поверх топика
CREATE STREAM orders (
order_id VARCHAR KEY,
amount DOUBLE,
user_id VARCHAR
) WITH (
KAFKA_TOPIC='raw-orders',
VALUE_FORMAT='AVRO'
);
-- Постоянное агрегирование (CSAS)
CREATE TABLE order_totals AS
SELECT user_id, SUM(amount) AS total
FROM orders
GROUP BY user_id
EMIT CHANGES;Механизм Kafka Streams для запроса текущего состояния локальных state stores через HTTP или gRPC без чтения из Kafka. Каждый экземпляр приложения может ответить на запросы к своим локальным партициям. Для распределённых state stores клиент должен найти нужный экземпляр через метаданные (streamsMetadataForStore) и выполнить RPC-запрос к нему. Используется для serving layer в Lambda/Kappa архитектурах.
// Экспозиция Interactive Queries через REST
var queryableStore = streams.store(
StoreQueryParameters.fromNameAndType(
"order-count-store",
QueryableStoreTypes.keyValueStore()
)
);
// Получить метаданные (какой экземпляр хранит ключ)
KeyQueryMetadata metadata = streams.queryMetadataForKey(
"order-count-store", "user-42", Serdes.String().serializer()
);
// metadata.activeHost() -> {host, port} нужного инстансаФреймворк аутентификации, используемый Kafka для проверки подлинности клиентов и брокеров. Kafka поддерживает несколько механизмов SASL: PLAIN (логин/пароль в открытом виде, только поверх TLS), SCRAM-SHA-256 и SCRAM-SHA-512 (безопасный хеш с защитой от перехвата), OAUTHBEARER (JWT-токены), GSSAPI/Kerberos (корпоративные среды с Active Directory). Выбор механизма зависит от инфраструктуры безопасности организации.
# Конфигурация брокера для SASL/SCRAM
listeners=SASL_SSL://0.0.0.0:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
# Создание пользователя
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config 'SCRAM-SHA-512=[iterations=8192,password=secret]' \
--entity-type users --entity-name alice
# Конфигурация клиента (JAAS)
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alice" password="secret";Двусторонняя проверка подлинности, при которой и брокер, и клиент предоставляют друг другу сертификаты X.509. Клиент проверяет сертификат брокера через truststore, брокер проверяет сертификат клиента (если ssl.client.auth=required). Обеспечивает шифрование трафика и аутентификацию без дополнительного механизма SASL. Управление жизненным циклом сертификатов — главная операционная сложность.
# Конфигурация брокера для mTLS
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystorepass
ssl.key.password=keypass
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststorepass
ssl.client.auth=required
# Конфигурация клиента
ssl.keystore.location=/etc/kafka/ssl/client.keystore.jks
ssl.truststore.location=/etc/kafka/ssl/client.truststore.jks
security.protocol=SSLМеханизм авторизации Kafka, определяющий права доступа конкретного principal (пользователя или сервиса) к ресурсу (топику, группе, кластеру) для определённой операции (Read, Write, Create, Describe, Delete). ACL проверяется после успешной аутентификации. Хранятся в ZooKeeper или (в KRaft-режиме) в метаданных кластера. Поддерживается ALLOW и DENY правила.
# Предоставить alice право записи в топик orders
kafka-acls.sh --bootstrap-server localhost:9092 \
--add \
--allow-principal User:alice \
--operation Write \
--topic orders
# Предоставить group order-processor право чтения
kafka-acls.sh --bootstrap-server localhost:9092 \
--add \
--allow-principal User:service-account \
--operation Read \
--group order-processor
# Просмотр всех ACL
kafka-acls.sh --list --bootstrap-server localhost:9092Механизм ограничения ресурсов, потребляемых клиентами Kafka, для защиты от шумных соседей. Kafka поддерживает три типа квот: producer_byte_rate (байт/с для продюсеров), consumer_byte_rate (байт/с для потребителей), request_percentage (процент сетевых и I/O потоков). При превышении квоты брокер добавляет задержку в ответы клиента (throttling), не обрывая соединение.
# Установка квоты для конкретного пользователя
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter \
--add-config 'producer_byte_rate=1048576,consumer_byte_rate=2097152' \
--entity-type users \
--entity-name alice
# Квота по умолчанию для всех клиентов
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter \
--add-config 'producer_byte_rate=10485760' \
--entity-type users \
--entity-defaultРегистрация событий авторизации и операций с кластером для целей безопасности и соответствия требованиям (GDPR, PCI DSS, SOC2). В Kafka с AclAuthorizer все события ALLOW/DENY записываются через log4j в отдельный лог-файл. Kafka Enterprise-решения предлагают централизованный аудит с маршрутизацией событий в SIEM-системы. Audit-лог должен быть защищён от изменений и храниться отдельно.
# log4j.properties — включение аудит-лога
log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=/var/log/kafka/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=falseСтандартный механизм Java для экспозиции метрик брокера через MBeans. Kafka публикует сотни метрик в пространстве имён kafka.server:type=..., kafka.network:type=..., kafka.log:type=.... Метрики доступны через JMX Exporter (Prometheus), Kafka Exporter или JConsole. Ключевые группы: BrokerTopicMetrics (throughput), ReplicaManager (ISR), NetworkProcessor (latency).
# Примеры критических JMX метрик
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
kafka.server:type=ReplicaManager,name=OfflinePartitionsCount
kafka.controller:type=KafkaController,name=ActiveControllerCount
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
# Запуск JMX Exporter для Prometheus
# -javaagent:jmx_prometheus_javaagent.jar=7071:kafka-jmx-config.ymlРазница между Log End Offset (LEO) лидера партиции и последним зафиксированным смещением группы потребителей. Лаг показывает, сколько записей группа ещё не обработала. Высокий и растущий лаг сигнализирует о том, что потребители не справляются с нагрузкой. Мониторинг лага — ключевая метрика SLA для потоковых систем. Инструменты: kafka-consumer-groups.sh, Burrow, Kafka Lag Exporter.
# Просмотр лага всех групп потребителей
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group order-processor
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-processor orders 0 1250 1300 50
# order-processor orders 1 980 980 0
# order-processor orders 2 700 750 50
# Общий лаг: 100 записейДинамическое изменение множества In-Sync Replicas в ответ на состояние репликации. ISR сжимается, когда реплика не отправляла Fetch-запрос лидеру дольше replica.lag.time.max.ms. ISR расширяется, когда отставшая реплика догнала лидера и несколько секунд остаётся синхронизированной. Частые изменения ISR (ISR thrashing) создают нагрузку на контроллер и сигнализируют о проблемах с производительностью реплик.
# Метрики ISR для мониторинга
kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
# Параметры, влияющие на ISR
replica.lag.time.max.ms=30000 # реплика выбывает через 30 сек
replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500
# Алерт: UnderReplicatedPartitions > 0 дольше 5 минутОперация переноса лидерства или реплик партиций между брокерами для балансировки нагрузки или замены оборудования. Выполняется через kafka-reassign-partitions.sh или автоматически через Cruise Control. Во время переноса происходит межброкерская репликация, которую необходимо ограничивать (throttling) для предотвращения влияния на производительность клиентов.
# Шаг 1: Сгенерировать план перераспределения
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--generate \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3,4" > reassignment.json
# Шаг 2: Применить с троттлингом
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--execute \
--reassignment-json-file reassignment.json \
--throttle 50000000 # 50 MB/s максимумСтандартный стек мониторинга Kafka в production. JMX Prometheus Exporter или Kafka Exporter (для consumer lag) собирают метрики и предоставляют их по HTTP. Prometheus скрапирует метрики каждые 15-60 секунд. Grafana визуализирует данные в дашбордах. Готовые дашборды: Kafka Overview (Grafana ID 7589), Consumer Lag Exporter (ID 12483). Alertmanager отправляет уведомления при нарушении пороговых значений.
# docker-compose.yml фрагмент
services:
jmx-exporter:
image: bitnami/jmx-exporter
command: ["5556", "/etc/config/kafka.yml"]
volumes:
- ./kafka-jmx-config.yml:/etc/config/kafka.yml
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
# Импорт дашборда: grafana.com/dashboards/7589Методика расчёта необходимых ресурсов кластера Kafka на основе ожидаемой нагрузки. Ключевые параметры: пропускная способность (MB/s записи = throughput × replication_factor), объём хранения (throughput × retention × replication_factor), сетевая пропускная способность (запись + чтение × количество групп потребителей). Правило большого пальца: 80% утилизация дискового I/O и сети как максимум.
# Формулы расчёта
# Требуемый дисковый I/O на запись:
# write_throughput * replication_factor
# Пример: 100 MB/s * 3 = 300 MB/s
# Требуемое хранилище:
# write_throughput * retention_hours * 3600 * replication_factor
# Пример: 100 MB/s * 168h * 3600 * 3 = ~181 TB
# Сетевой трафик на брокер (лидер партиции):
# write + read * num_consumer_groups
# = 100 + 100 * 3 = 400 MB/s исходящихИнструмент для репликации данных между кластерами Kafka, построенный на базе Kafka Connect. Поддерживает топологии active-passive (односторонняя репликация для DR) и active-active (двусторонняя для геораспределённых сред). MirrorMaker 2 сохраняет смещения потребителей через трансляцию (offset translation), позволяя группам переключаться между кластерами без потери позиции. Автоматически создаёт топики с префиксом источника.
# mm2.properties — конфигурация репликации active-passive
clusters=primary, secondary
primary.bootstrap.servers=kafka-primary:9092
secondary.bootstrap.servers=kafka-secondary:9092
# Репликация из primary в secondary
primary->secondary.enabled=true
primary->secondary.topics=orders.*,payments.*
# Синхронизация смещений потребителей
primary->secondary.sync.group.offsets.enabled=true
primary->secondary.emit.checkpoints.interval.seconds=10
# Запуск
connect-mirror-maker.sh mm2.propertiesКлючевые метрики для проектирования аварийного восстановления (DR) Kafka-кластеров. RPO (Recovery Point Objective) — максимально допустимый объём потерянных данных, выраженный во времени: сколько данных можно потерять при катастрофическом сбое. RTO (Recovery Time Objective) — максимально допустимое время восстановления сервиса. Для Kafka: RPO определяется задержкой репликации MirrorMaker 2, RTO — временем переключения клиентов на резервный кластер.
# Пример анализа RPO/RTO для Kafka DR
# Сценарий: active-passive с MirrorMaker 2
# Задержка репликации: ~5 секунд
# RPO = 5 секунд (данные за 5 сек могут быть потеряны)
# Процедура переключения:
# 1. Обнаружение сбоя primary: ~30 сек
# 2. Переключение DNS/LB: ~2 мин
# 3. Перезапуск клиентов: ~5 мин
# RTO = ~8 минут
# Для RPO=0: синхронная репликация
# (недоступна в стандартной Kafka, требует Kafka Bridge)Архитектурный паттерн, при котором состояние системы определяется не текущими значениями в базе данных, а как результат воспроизведения последовательности событий. Kafka выступает журналом событий: каждое изменение состояния публикуется как неизменяемое событие. Текущее состояние восстанавливается воспроизведением событий от начала лога (или от снимка). Обеспечивает полный аудит, возможность временного перемотки и воспроизводимость.
// Паттерн Event Sourcing с Kafka
// Команды -> события -> проекции
// Событие (неизменяемое)
public record OrderPlaced(
String orderId,
String userId,
List<OrderItem> items,
double totalAmount,
Instant timestamp
) {}
// Публикация события
producer.send(new ProducerRecord<>(
"order-events",
orderId, // ключ для партиционирования
new OrderPlaced(orderId, userId, items, total, Instant.now())
));
// Проекция: восстановление состояния через Kafka Streams KTableПаттерн разделения команд (изменений состояния) и запросов (чтений) на разные модели. В связке с Kafka: команды публикуются как события в топик, потребители проецируют события в оптимизированные для чтения хранилища (Elasticsearch для поиска, PostgreSQL для транзакций, Redis для кеша). Kafka Streams или ksqlDB создают и поддерживают материализованные представления в актуальном состоянии.
// CQRS с Kafka
// Write side: команды -> события
kafka-topic: order-events (append-only event log)
// Read side: несколько проекций для разных сценариев
// 1. PostgreSQL: текущий статус заказа (по ID)
// 2. Elasticsearch: поиск заказов по атрибутам
// 3. Redis: корзина пользователя (TTL)
// 4. KTable в Kafka Streams: агрегаты для дашборда
// Connect Sink обновляет все проекции
// асинхронно из топика order-eventsПаттерн надёжной публикации событий без распределённых транзакций между базой данных и Kafka. Приложение в одной локальной транзакции записывает изменение данных и событие в таблицу outbox той же базы данных. Отдельный процесс (или Debezium CDC) читает таблицу outbox и публикует события в Kafka. Гарантирует, что событие будет опубликовано тогда и только тогда, когда транзакция в БД зафиксирована.
-- Схема outbox таблицы
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
topic VARCHAR NOT NULL,
key VARCHAR,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
sent_at TIMESTAMP -- NULL = ещё не отправлено
);
-- В одной транзакции с бизнес-логикой
BEGIN;
UPDATE orders SET status = 'CONFIRMED' WHERE id = $1;
INSERT INTO outbox (topic, key, payload)
VALUES ('order-events', $1, '{"event": "OrderConfirmed", ...}');
COMMIT;Паттерн управления распределёнными транзакциями через последовательность локальных транзакций, координируемых через события. При сбое на любом шаге выполняются компенсирующие транзакции в обратном порядке. Хореография (choreography): сервисы реагируют на события друг друга. Оркестрация (orchestration): центральный координатор управляет порядком шагов. Kafka служит шиной событий между сервисами Saga.
// Saga хореография: оформление заказа
// 1. OrderService: OrderPlaced -> orders-events
// 2. PaymentService: слушает orders-events,
// резервирует оплату -> PaymentReserved
// 3. InventoryService: слушает PaymentReserved,
// резервирует товар -> InventoryReserved
// 4. ShippingService: создаёт доставку -> OrderCompleted
// Компенсация при ошибке на шаге 3:
// InventoryService: InventoryFailed ->
// PaymentService: отменяет резервированиеНабор политик, соглашений и процессов для управления жизненным циклом топиков в масштабируемой Kafka-среде. Включает: соглашения об именовании (домен.сущность.событие.версия), управление схемами через Schema Registry, политики доступа через ACL, определение владельцев топиков, политики хранения и архивирования, процессы создания и удаления топиков. Предотвращает неконтролируемый рост числа топиков.
# Соглашение об именовании топиков
# формат: {domain}.{entity}.{event}.{version}
# Примеры:
ordersvc.order.placed.v1
ordersvc.order.shipped.v1
payment.transaction.completed.v2
analytics.user.session.ended.v1
# Метаданные топика (через Custom Config)
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --topic ordersvc.order.placed.v1 \
--add-config 'owner=orders-team,\
[email protected],\
sla-tier=gold'