Learning Platform
Troubleshooting
Глоссарий

Глоссарий — Apache Kafka

Справочник ключевых терминов курса Apache Kafka.

10 категорий · 59 терминов

Основы Kafka

Распределённый журнал фиксации

Distributed Commit Log
Термин

Фундаментальная абстракция Apache Kafka: упорядоченная, неизменяемая последовательность записей, реплицированная по нескольким брокерам. Каждая запись добавляется в конец лога и сохраняет своё смещение навсегда. Именно эта модель отличает Kafka от традиционных очередей сообщений: потребители читают лог независимо, не удаляя записи.

Пример:
# Kafka как commit log:
# Запись добавляется в конец, offset монотонно растёт
# offset: 0  1  2  3  4  5  6
# data:   A  B  C  D  E  F  G
#                          ^--- Log End Offset (LEO)
# Consumer group A читает с offset 3
# Consumer group B читает с offset 6
# Записи не удаляются после чтения
Подробнее в уроках:

Брокер

Broker
Термин

Отдельный процесс сервера Kafka, хранящий данные и обслуживающий запросы продюсеров и потребителей. Каждый брокер идентифицируется уникальным числом broker.id и обслуживает набор партиций. Брокеры образуют кластер, обмениваясь метаданными через протокол KRaft или (исторически) через ZooKeeper.

Пример:
# server.properties — базовая конфигурация брокера
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-1.example.com:9092
log.dirs=/var/kafka/data
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
Подробнее в уроках:

Топик

Topic
Термин

Именованный логический канал для организации потока событий в Kafka. Топик разбивается на партиции, каждая из которых является независимым упорядоченным логом. Топик не привязан к конкретному брокеру: его партиции могут быть распределены по всему кластеру. Retention-политика определяет, как долго хранятся записи.

Пример:
# Создание топика через CLI
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic orders \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete
Подробнее в уроках:

Партиция

Partition
Термин

Базовая единица параллелизма и хранения в Kafka. Партиция представляет собой упорядоченный, только-для-добавления лог записей. Каждая партиция хранится на одном брокере-лидере и реплицируется на несколько брокеров-фолловеров. Количество партиций определяет максимальный параллелизм чтения в потребительской группе.

Пример:
# Топик 'orders' с 3 партициями на кластере из 3 брокеров:
# Partition 0: Leader=broker-1, Replicas=[1,2,3], ISR=[1,2,3]
# Partition 1: Leader=broker-2, Replicas=[2,3,1], ISR=[2,3,1]
# Partition 2: Leader=broker-3, Replicas=[3,1,2], ISR=[3,1,2]

kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
Подробнее в уроках:

Смещение

Offset
Термин

Уникальный монотонно возрастающий целочисленный идентификатор записи внутри конкретной партиции. Смещение позволяет потребителям точно отслеживать позицию чтения: в случае перезапуска потребитель возобновляет чтение с последнего зафиксированного смещения. Смещения хранятся во внутреннем топике __consumer_offsets.

Пример:
# Фиксация смещения вручную в Java
consumer.poll(Duration.ofMillis(100)).forEach(record -> {
    processRecord(record);
    // Фиксация после обработки каждой записи
    consumer.commitSync(Collections.singletonMap(
        new TopicPartition(record.topic(), record.partition()),
        new OffsetAndMetadata(record.offset() + 1)
    ));
});
Подробнее в уроках:

Сегмент лога

Log Segment
Термин

Физический файл на диске, в котором хранится часть записей партиции. Каждая партиция состоит из одного активного и нескольких закрытых сегментов. Kafka создаёт новый сегмент при достижении порога log.segment.bytes или log.roll.ms. К каждому сегменту прилагаются индексные файлы (.index, .timeindex) для быстрого поиска по смещению и времени.

Пример:
# Структура файлов партиции на диске
/var/kafka/data/orders-0/
  00000000000000000000.log       # сегмент от offset 0
  00000000000000000000.index     # индекс смещений
  00000000000000000000.timeindex # индекс по времени
  00000000000001048576.log       # следующий сегмент
  00000000000001048576.index
  00000000000001048576.timeindex
Подробнее в уроках:

Кластер Kafka

Kafka Cluster
Термин

Совокупность брокеров Kafka, совместно обслуживающих топики и обеспечивающих высокую доступность. В режиме KRaft часть брокеров выполняет роль контроллеров, формируя кворум метаданных. Клиенты подключаются к любому брокеру через bootstrap-список и автоматически получают актуальные метаданные обо всём кластере.

Пример:
# Подключение клиента к кластеру
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092

# Клиент запрашивает метаданные у первого доступного брокера
# и узнаёт адреса всех брокеров и лидеров партиций
# Последующие запросы идут напрямую к нужному брокеру
Подробнее в уроках:

Репликация и консенсус

ISR (In-Sync Replicas)

ISR (In-Sync Replicas)
Термин

Множество реплик партиции, которые отстают от лидера не более чем на replica.lag.time.max.ms миллисекунд. Только записи, подтверждённые всеми репликами из ISR, считаются зафиксированными (committed). Если реплика выходит из ISR, лидер может продолжить работу; если ISR сжимается ниже min.insync.replicas, лидер начинает отклонять записи продюсеров с acks=all.

Пример:
# Мониторинг ISR через CLI
kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092
# Topic: orders  Partition: 0  Leader: 1  Replicas: 1,2,3  ISR: 1,2  Offline: 3
# ^ broker-3 выпал из ISR — под-реплицированная партиция!

# JMX метрика: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
Подробнее в уроках:

High Watermark (HW)

High Watermark (HW)
Термин

Максимальное смещение в партиции, которое подтверждено всеми репликами из ISR. Потребители не могут прочитать записи выше High Watermark: это гарантирует, что они видят только зафиксированные данные, которые не будут потеряны при сбое лидера. Лидер периодически сообщает фолловерам актуальное значение HW через ответы на Fetch-запросы.

Пример:
# Соотношение LEO и HW:
# Leader LEO = 10, Follower-1 LEO = 10, Follower-2 LEO = 8
# ISR = {Leader, Follower-1, Follower-2}
# HW = min(LEO всех в ISR) = 8
# Потребитель прочитает только offset 0..7
# После репликации Follower-2: HW продвинется до 10

# JMX: kafka.log:type=Log,name=LogEndOffset,topic=orders,partition=0
Подробнее в уроках:

LEO (Log End Offset)

LEO (Log End Offset)
Термин

Смещение следующей записи, которая будет добавлена в лог партиции на данной реплике. LEO на лидере всегда больше или равен LEO любого фолловера. Разница между LEO лидера и LEO фолловера определяет отставание реплики. Когда все реплики из ISR достигают одного LEO, High Watermark продвигается до этого значения.

Пример:
# Состояние реплик партиции
# Запись смещений на каждой реплике:
# Leader:     LEO=15  (принял все записи)
# Follower-1: LEO=15  (в ISR, в синхронизации)
# Follower-2: LEO=12  (отстаёт, ещё в ISR)
# Follower-3: LEO=5   (выбыл из ISR)
# HW = 12 (min LEO среди ISR)
# Лаг Follower-2: 15 - 12 = 3 записи
Подробнее в уроках:

Leader Epoch (эпоха лидера)

Leader Epoch
Термин

Монотонно возрастающий счётчик, увеличивающийся при каждой смене лидера партиции. Epoch служит fencing-токеном: новый лидер использует epoch для обнаружения и отклонения устаревших запросов от бывшего лидера. Это предотвращает сценарий split-brain, когда два брокера одновременно считают себя лидером и могут записывать конфликтующие данные.

Пример:
# Формат ответа на Fetch-запрос фолловера
# LeaderEpoch передаётся с каждым ответом
# Фолловер проверяет: если epoch изменился, нужно усечь лог

# Файл leader-epoch-checkpoint хранит историю
# /var/kafka/data/orders-0/leader-epoch-checkpoint
# 0
# 3  (epoch=3 начался с offset=0)
# 5  (epoch=5 начался с offset=200)
# Epoch 5 — текущий лидер
Подробнее в уроках:

KRaft (Kafka Raft)

KRaft (Kafka Raft)
Термин

Встроенный протокол консенсуса Kafka (с версии 2.8, обязателен с 4.0), заменивший ZooKeeper. Метаданные кластера хранятся во внутреннем топике __cluster_metadata с фактором репликации, равным количеству контроллеров. Алгоритм Raft обеспечивает консенсус между контроллерами без внешних зависимостей, упрощая развёртывание и снижая задержку операций с метаданными.

Пример:
# KRaft-конфигурация для комбинированного узла
# (брокер + контроллер)
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
controller.listener.names=CONTROLLER

# Инициализация кластера
kafka-storage.sh format -t <cluster-id> -c server.properties
Подробнее в уроках:

Controller Quorum (кворум контроллеров)

Controller Quorum
Термин

Группа KRaft-контроллеров, которые через протокол Raft управляют метаданными кластера: списком брокеров, конфигурациями топиков, назначением лидеров партиций. Один контроллер является активным (active controller), остальные — резервными (standby). Топик __cluster_metadata хранит все изменения в виде event log.

Пример:
# Проверка статуса кворума контроллеров
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
# ClusterId:              abc123xyz
# LeaderId:               1
# LeaderEpoch:            5
# HighWatermark:          1248
# MaxFollowerLag:         0
# MaxFollowerLagTimeMs:   120
# CurrentVoters:          [1,2,3]
# CurrentObservers:       []
Подробнее в уроках:

Нечистые выборы лидера

Unclean Leader Election
Термин

Выбор нового лидера партиции из реплик, не входящих в ISR, при отсутствии доступных синхронизированных реплик. Включается параметром unclean.leader.election.enable=true. Обеспечивает доступность ценой возможной потери данных: новый лидер не имеет всех зафиксированных записей и перезапишет часть лога. По умолчанию отключён для систем, требующих гарантий сохранности данных.

Пример:
# Параметр топика — включить только при критической необходимости
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name orders \
  --alter \
  --add-config unclean.leader.election.enable=true

# Риск: если было 5 незафиксированных записей,
# после выборов нечистого лидера они будут потеряны
# и потребитель увидит дыру или перечитает старые данные
Подробнее в уроках:

Producers

Record Accumulator (аккумулятор записей)

Record Accumulator
Термин

Буфер в памяти продюсера, группирующий записи в батчи перед отправкой на брокер. Для каждой пары (топик, партиция) поддерживается своя очередь ProducerBatch. Запись считается готовой к отправке, когда батч достиг batch.size байт или истёк linger.ms. Record Accumulator — главный механизм повышения пропускной способности продюсера.

Пример:
# Настройка аккумулятора для высокой пропускной способности
Properties props = new Properties();
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     // 64 KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);         // ждать до 10 мс
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32 MB общий буфер
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // сжатие батча
// При linger.ms=10 продюсер агрегирует записи 10 мс,
// затем отправляет один большой батч вместо многих мелких
Подробнее в уроках:

Идемпотентный продюсер

Idempotent Producer
Термин

Режим работы продюсера, гарантирующий доставку каждой записи ровно один раз (exactly once) в одну партицию. Брокер присваивает продюсеру уникальный Producer ID (PID) и отслеживает порядковый номер (sequence number) каждой записи. Дублирующиеся записи с одинаковым PID и sequence отбрасываются. Включается параметром enable.idempotence=true (по умолчанию с Kafka 3.0).

Пример:
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Автоматически устанавливает:
// acks=all, retries=Integer.MAX_VALUE, max.in.flight=5

// Брокер хранит: {PID=42, partition=0, lastSeq=100}
// Запись с seq=100 при повторе будет отброшена
// Запись с seq=101 будет принята как новая
Подробнее в уроках:

Транзакции Kafka

Kafka Transactions
Термин

Механизм атомарной записи в несколько партиций и топиков в рамках одной транзакции. Используется для реализации exactly-once semantics (EOS) в сценариях consume-transform-produce. Продюсер с transactional.id взаимодействует с transaction coordinator — специальным брокером, хранящим состояние транзакции в топике __transaction_state.

Пример:
producer.initTransactions();
try {
    producer.beginTransaction();
    // Запись в несколько топиков атомарна
    producer.send(new ProducerRecord<>("orders", key, value));
    producer.send(new ProducerRecord<>("audit-log", key, audit));
    // Фиксация смещений потребителя в рамках транзакции
    producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}
Подробнее в уроках:

Acks (подтверждения)

Acks (Acknowledgments)
Термин

Параметр продюсера, определяющий, сколько реплик должны подтвердить получение записи, прежде чем брокер ответит продюсеру об успехе. Три значения: acks=0 — не ждать подтверждения (максимальная скорость, возможна потеря); acks=1 — ждать только лидера (риск потери при немедленном падении); acks=all (или -1) — ждать всех реплик из ISR (максимальная надёжность).

Пример:
# acks=all + min.insync.replicas=2 — золотой стандарт
# server.properties (глобально)
default.replication.factor=3
min.insync.replicas=2

# producer.properties
acks=all
# Семантика: запись подтверждена, если 2 из 3 реплик
# записали её на диск. При падении одного брокера
# данные не потеряны — у двух других есть копия
Подробнее в уроках:

Партиционирование по ключу

Key-Based Partitioning
Термин

Стратегия выбора партиции для записи на основе ключа сообщения. DefaultPartitioner применяет алгоритм murmur2 к ключу и вычисляет номер партиции: partition = murmur2(key) % numPartitions. Записи с одинаковым ключом гарантированно попадают в одну партицию, что обеспечивает упорядоченность для конкретного ключа. При null-ключе используется sticky partitioner.

Пример:
// Запись с ключом — гарантия порядка для user_id
ProducerRecord<String, Order> record = new ProducerRecord<>(
    "orders",
    "user-42",  // ключ — всегда та же партиция
    new Order(orderId, items)
);
producer.send(record);

// Кастомный партиционер для равномерного распределения
public class RoundRobinPartitioner implements Partitioner {
    public int partition(String topic, Object key,
        byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return (int)(counter.getAndIncrement() % cluster.partitionCountForTopic(topic));
    }
}
Подробнее в уроках:

Consumers

Consumer Group (группа потребителей)

Consumer Group
Термин

Логическая группа потребителей с общим group.id, совместно читающих топик. Каждая партиция назначается ровно одному потребителю внутри группы, обеспечивая параллельную обработку без дублирования. Несколько групп могут независимо читать один топик — каждая получает полную копию данных. Текущая позиция каждой группы в каждой партиции хранится в топике __consumer_offsets.

Пример:
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
// Все экземпляры с group.id="order-processor" делят партиции
Подробнее в уроках:

Group Coordinator (координатор группы)

Group Coordinator
Термин

Специальный брокер, ответственный за управление конкретной группой потребителей: отслеживание живых членов, инициирование ребалансировок и хранение зафиксированных смещений. Координатор определяется по hash(group.id) % numPartitions(__consumer_offsets). Каждый потребитель при старте регистрируется у своего координатора через запрос FindCoordinator.

Пример:
# Определение брокера-координатора для группы
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group order-processor

# Вывод: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# Coordinator находится на брокере, ответственном за
# партицию hash("order-processor") % 50 топика __consumer_offsets
Подробнее в уроках:

Ребалансировка

Rebalancing
Термин

Процесс перераспределения партиций между потребителями в группе при изменении её состава (присоединение, выход, сбой) или количества партиций. Во время ребалансировки все потребители группы приостанавливают обработку. Eager rebalancing отзывает все назначения и перераспределяет заново. Cooperative Sticky rebalancing (ICKP) перераспределяет только изменившиеся партиции, минимизируя простой.

Пример:
# Настройка Cooperative Sticky Assignor (минимальный простой)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    CooperativeStickyAssignor.class.getName());

# Настройка таймаутов сессии
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000); // 45 сек
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000); // 15 сек
# Потребитель должен отправлять heartbeat каждые 15 сек,
# иначе через 45 сек координатор инициирует ребалансировку
Подробнее в уроках:

Static Membership (статическое членство)

Static Membership
Термин

Режим потребителя с фиксированным идентификатором group.instance.id, позволяющий избежать ребалансировки при кратковременных перезапусках. Потребитель со статическим ID сохраняет своё назначение партиций в течение session.timeout.ms после отключения. При перезапуске он присоединяется к группе с теми же партициями без полной ребалансировки. Полезно для stateful-потребителей.

Пример:
# Конфигурация статического членства
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-pod-3");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000); // 5 минут
# При рестарте пода consumer-pod-3 в течение 5 минут
# он снова получит те же партиции без ребалансировки
# Идеально для Kafka Streams и stateful обработчиков
Подробнее в уроках:

Коммит смещения

Offset Commit
Термин

Запись текущей позиции потребителя в партиции в топик __consumer_offsets. Автоматический коммит (enable.auto.commit=true) выполняется каждые auto.commit.interval.ms — прост, но может привести к дублированию при сбое. Ручной коммит (commitSync или commitAsync) даёт точный контроль над тем, когда запись считается обработанной. Зафиксированное смещение = следующая запись для чтения при перезапуске.

Пример:
// Ручной коммит после гарантированной обработки
consumer.poll(Duration.ofMillis(100)).forEach(record -> {
    try {
        processRecord(record);
        // Асинхронный коммит для производительности
        consumer.commitAsync((offsets, exception) -> {
            if (exception != null) log.error("Commit failed", exception);
        });
    } catch (Exception e) {
        // Синхронный коммит при завершении для надёжности
        consumer.commitSync();
        throw e;
    }
});
Подробнее в уроках:

Внутреннее устройство

Уплотнение лога

Log Compaction
Термин

Политика хранения, при которой Kafka сохраняет только последнюю запись для каждого уникального ключа. Cleaner-потоки брокера периодически объединяют сегменты лога, удаляя устаревшие версии ключей. Итоговый лог содержит полный снимок текущего состояния системы. Подходит для топиков с семантикой event-sourcing: топик изменений конфигурации, changelog Kafka Streams.

Пример:
# Создание топика с уплотнением лога
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic user-profiles \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.1 \
  --config delete.retention.ms=86400000

# Запись с value=null (tombstone) удаляет ключ
producer.send(new ProducerRecord<>("user-profiles", "user-42", null));
Подробнее в уроках:

Tiered Storage (многоуровневое хранение)

Tiered Storage
Термин

Функция (с Kafka 3.6+), позволяющая переносить старые сегменты лога в объектное хранилище (S3, GCS, Azure Blob) при сохранении горячих данных на локальном диске брокера. Брокеры хранят только локальный кеш последних сегментов, обеспечивая прозрачный доступ к архивным данным. Снижает затраты на хранение и позволяет хранить данные практически неограниченно.

Пример:
# server.properties — включение tiered storage
remote.log.storage.system.enable=true
remote.log.manager.task.interval.ms=30000

# Плагин для S3 (конкретная реализация зависит от провайдера)
remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.RemoteLogStorageManager

# Конфигурация топика
kafka-configs.sh --alter --topic events \
  --add-config remote.storage.enable=true,\
  local.retention.ms=86400000
Подробнее в уроках:

Zero-Copy Transfer

Zero-Copy Transfer
Термин

Оптимизация передачи данных в ядре Linux, используемая Kafka при отправке данных потребителям. Вместо копирования данных из дискового кеша ОС в пространство пользователя и обратно, Kafka использует системный вызов sendfile(), который передаёт данные напрямую из кеша страниц в сетевой буфер. Это снижает нагрузку на CPU и увеличивает пропускную способность в разы.

Пример:
# Путь данных без zero-copy (4 копирования):
# Диск -> OS Page Cache -> User Space Buffer -> Kernel Socket Buffer -> NIC

# Путь данных с zero-copy (2 копирования, sendfile):
# Диск -> OS Page Cache -> NIC

# В Java: FileChannel.transferTo() использует sendfile()
# Kafka автоматически применяет zero-copy для FileChannel
# ВАЖНО: работает только без шифрования SSL на уровне брокера
Подробнее в уроках:

Wire Protocol (протокол Kafka)

Wire Protocol
Термин

Бинарный протокол запросов/ответов для взаимодействия клиентов с брокерами и брокеров между собой. Каждый запрос имеет 2-байтный API Key (например, Produce=0, Fetch=1), версию API и correlation ID для сопоставления ответов. Kafka поддерживает версионирование API: новые клиенты могут работать со старыми брокерами и наоборот через согласование версии.

Пример:
# Структура запроса Produce (упрощённо)
# [4 байта] Length
# [2 байта] API Key = 0 (Produce)
# [2 байта] API Version = 9
# [4 байта] Correlation ID
# [2 байта] Client ID length + данные
# [body]    Topic name, acks, timeout, records

# Просмотр поддерживаемых API брокером
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
Подробнее в уроках:

Connect и Schema Registry

Kafka Connect

Kafka Connect
Термин

Фреймворк для масштабируемой и надёжной интеграции Kafka с внешними системами без написания кода. Source-коннекторы читают данные из внешних систем (базы данных, файловые системы) и публикуют в Kafka. Sink-коннекторы читают из Kafka и записывают во внешние системы (Elasticsearch, S3, PostgreSQL). Connect поддерживает горизонтальное масштабирование через воркеры.

Пример:
# Конфигурация JDBC Source коннектора
curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "postgres-orders",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:postgresql://localhost/shop",
      "table.whitelist": "orders",
      "mode": "timestamp+incrementing",
      "timestamp.column.name": "updated_at",
      "incrementing.column.name": "id",
      "topic.prefix": "pg."
    }
  }'
Подробнее в уроках:

SMT (Single Message Transform)

SMT (Single Message Transform)
Термин

Лёгкие преобразования, применяемые к каждой записи в конвейере Kafka Connect без написания кастомного коннектора. SMT работают непосредственно на воркере Connect и выполняются до/после коннектора. Примеры: ReplaceField (переименование полей), MaskField (маскирование данных), ExtractField (извлечение вложенного поля), InsertField (добавление метаданных). Для сложных преобразований рекомендуется Kafka Streams.

Пример:
# Конфигурация SMT в коннекторе
# Маскировка PII + добавление топика как поля
transforms=maskPII,addTopic
transforms.maskPII.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.maskPII.fields=email,phone
transforms.addTopic.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addTopic.topic.field=_kafka_topic
# Для условного применения:
transforms.maskPII.predicate=isProduction
predicates.isProduction.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.isProduction.name=env
Подробнее в уроках:

Dead Letter Queue (DLQ)

Dead Letter Queue (DLQ)
Термин

Топик Kafka, в который Kafka Connect направляет записи, которые не удалось обработать. При ошибке десериализации, преобразования или записи во внешнюю систему запись помещается в DLQ вместо остановки коннектора. Заголовки записи содержат информацию об ошибке (тип исключения, трассировку стека, имя коннектора). DLQ позволяет команде исследовать проблемные записи без потери данных.

Пример:
# Включение DLQ в конфигурации sink-коннектора
errors.tolerance=all
errors.deadletterqueue.topic.name=orders-dlq
errors.deadletterqueue.topic.replication.factor=3
errors.deadletterqueue.context.headers.enable=true
errors.log.enable=true
errors.log.include.messages=true
# Чтение DLQ для диагностики
kafka-console-consumer.sh \
  --topic orders-dlq \
  --property print.headers=true \
  --bootstrap-server localhost:9092
Подробнее в уроках:

Schema Registry

Schema Registry
Термин

Централизованный сервис для управления схемами сообщений (Avro, Protobuf, JSON Schema). Каждой схеме присваивается числовой ID; продюсеры записывают в топик Magic Byte (0x0) + 4-байтный schema ID + payload. Потребители используют schema ID для получения схемы и десериализации. Schema Registry версионирует схемы в рамках subject и применяет правила совместимости при регистрации новой версии.

Пример:
# Регистрация схемы Avro через REST API
curl -X POST http://schema-registry:8081/subjects/orders-value/versions \
  -H 'Content-Type: application/vnd.schemaregistry.v1+json' \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}

# Wire Format: [0x00][schema_id: 4 bytes][avro payload]
Подробнее в уроках:

Avro

Avro
Термин

Компактный бинарный формат сериализации с поддержкой эволюции схемы, наиболее часто используемый с Kafka и Schema Registry. Avro-запись не содержит имён полей — они определяются схемой, что делает payload минимальным. Поддерживает добавление/удаление необязательных полей с default-значениями без нарушения совместимости. Поддержка Schema Registry встроена в confluent-kafka-avro клиенты.

Пример:
# Схема Avro (orders.avsc)
{
  "type": "record",
  "name": "Order",
  "namespace": "com.example",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "status", "type": "string", "default": "PENDING"}
  ]
}
# Эволюция: добавление поля с default не нарушает
# обратную совместимость (BACKWARD mode)
Подробнее в уроках:

Режимы совместимости

Schema Compatibility Modes
Термин

Правила Schema Registry, определяющие, какие изменения схемы допустимы при регистрации новой версии. BACKWARD: новая версия может читать данные, записанные старой. FORWARD: старая версия может читать данные, записанные новой. FULL: совместима в обоих направлениях. BACKWARD_TRANSITIVE: совместима со всеми предыдущими версиями, а не только с предыдущей. По умолчанию используется BACKWARD.

Пример:
# Установка режима совместимости для subject
curl -X PUT http://schema-registry:8081/config/orders-value \
  -H 'Content-Type: application/vnd.schemaregistry.v1+json' \
  -d '{"compatibility": "BACKWARD"}'

# Таблица допустимых изменений при BACKWARD:
# Добавить опциональное поле (с default) — РАЗРЕШЕНО
# Удалить поле                           — РАЗРЕШЕНО
# Добавить обязательное поле             — ЗАПРЕЩЕНО
# Изменить тип поля                      — ЗАПРЕЩЕНО
Подробнее в уроках:

Kafka Streams и ksqlDB

KStream

KStream
Термин

Абстракция Kafka Streams, представляющая бесконечный поток записей, где каждая запись интерпретируется как независимое событие. KStream поддерживает stateless-операции (filter, map, flatMap) и stateful-операции (aggregate, join, windowed operations). При материализации KStream создаёт новый топик с результатами. Все записи сохраняются, даже с одинаковым ключом.

Пример:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders =
    builder.stream("raw-orders");

// Stateless трансформация
KStream<String, EnrichedOrder> enriched = orders
    .filter((key, order) -> order.getAmount() > 0)
    .mapValues(order -> enrich(order));

// Запись в новый топик
enriched.to("enriched-orders",
    Produced.with(Serdes.String(), enrichedSerde));
Подробнее в уроках:

KTable

KTable
Термин

Абстракция Kafka Streams, представляющая changelog-поток, материализованный как таблица: для каждого ключа хранится только последнее значение. Запись с null-значением воспринимается как удаление ключа (tombstone). KTable хранит состояние локально в RocksDB и реплицирует его через changelog-топик. Joins с KTable — это lookup по ключу в текущем снимке состояния.

Пример:
StreamsBuilder builder = new StreamsBuilder();
// KTable из топика user-profiles
KTable<String, UserProfile> profiles =
    builder.table("user-profiles",
        Materialized.as("profiles-store"));

// Join KStream с KTable
KStream<String, EnrichedOrder> enriched = orders.join(
    profiles,
    (order, profile) -> new EnrichedOrder(order, profile.getName())
);
// Join мгновенный: lookup по ключу в локальном state store
Подробнее в уроках:

GlobalKTable

GlobalKTable
Термин

Вариант KTable, в котором каждый экземпляр приложения хранит полную копию таблицы, а не только партиции, назначенные ему. Позволяет выполнять join по произвольному атрибуту (не обязательно по ключу раздела). Полезен для небольших справочных таблиц (например, конфигурация, коды стран), которые нужно иметь на каждом узле. Недостаток: потребляет больше памяти и сети.

Пример:
StreamsBuilder builder = new StreamsBuilder();
// GlobalKTable реплицируется на все экземпляры
GlobalKTable<String, Product> products =
    builder.globalTable("product-catalog");

// Можно join по любому полю значения, не только по ключу
KStream<String, EnrichedOrder> enriched = orders.join(
    products,
    (orderKey, order) -> order.getProductId(), // извлечение ключа для join
    (order, product) -> new EnrichedOrder(order, product)
);
Подробнее в уроках:

State Store (хранилище состояния)

State Store
Термин

Локальное хранилище ключ-значение в Kafka Streams, поддерживающее stateful-операции: агрегации, джоины, windowing. По умолчанию реализован на базе RocksDB (persistent) или HashMap (in-memory). Каждый state store автоматически реплицируется через changelog-топик, что позволяет восстановить состояние при перезапуске или ребалансировке. Standby-реплики обеспечивают быстрое восстановление.

Пример:
// Materialized store для агрегации
KTable<String, Long> orderCounts = orders
    .groupByKey()
    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>
        as("order-count-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.Long())
    );

// Запрос состояния через Interactive Queries
ReadOnlyKeyValueStore<String, Long> store =
    streams.store(StoreQueryParameters.fromNameAndType(
        "order-count-store", QueryableStoreTypes.keyValueStore()));
Подробнее в уроках:

Оконные операции

Windowing
Термин

Механизм группировки событий по временным окнам для вычисления агрегатов за определённый период. Kafka Streams поддерживает четыре типа окон: Tumbling (неперекрывающиеся окна фиксированного размера), Hopping (перекрывающиеся окна), Session (динамические окна по активности), Sliding (окна фиксированного размера, скользящие по событиям). Результаты хранятся в windowed state store.

Пример:
// Подсчёт заказов за каждые 5 минут (Tumbling Window)
KTable<Windowed<String>, Long> windowedCounts = orders
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("windowed-count-store"));

// Ключ результата: Windowed{key="user-1", window=[12:00, 12:05)}
// Hopping: окно 5 мин, шаг 1 мин — записи считаются в нескольких окнах
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30))
    .advanceBy(Duration.ofMinutes(1));
Подробнее в уроках:

ksqlDB

ksqlDB
Термин

SQL-движок для потоковой обработки на базе Kafka Streams, позволяющий писать потоковые запросы на SQL без написания Java-кода. Создаёт STREAM (аналог KStream) и TABLE (аналог KTable) поверх топиков Kafka. CSAS (CREATE STREAM AS SELECT) и CTAS (CREATE TABLE AS SELECT) запускают постоянные потоковые вычисления. Поддерживает push-запросы (подписка на поток изменений) и pull-запросы (точечный lookup по состоянию).

Пример:
-- Создание стрима поверх топика
CREATE STREAM orders (
    order_id VARCHAR KEY,
    amount DOUBLE,
    user_id VARCHAR
) WITH (
    KAFKA_TOPIC='raw-orders',
    VALUE_FORMAT='AVRO'
);

-- Постоянное агрегирование (CSAS)
CREATE TABLE order_totals AS
    SELECT user_id, SUM(amount) AS total
    FROM orders
    GROUP BY user_id
    EMIT CHANGES;
Подробнее в уроках:

Interactive Queries (интерактивные запросы)

Interactive Queries
Термин

Механизм Kafka Streams для запроса текущего состояния локальных state stores через HTTP или gRPC без чтения из Kafka. Каждый экземпляр приложения может ответить на запросы к своим локальным партициям. Для распределённых state stores клиент должен найти нужный экземпляр через метаданные (streamsMetadataForStore) и выполнить RPC-запрос к нему. Используется для serving layer в Lambda/Kappa архитектурах.

Пример:
// Экспозиция Interactive Queries через REST
var queryableStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "order-count-store",
        QueryableStoreTypes.keyValueStore()
    )
);

// Получить метаданные (какой экземпляр хранит ключ)
KeyQueryMetadata metadata = streams.queryMetadataForKey(
    "order-count-store", "user-42", Serdes.String().serializer()
);
// metadata.activeHost() -> {host, port} нужного инстанса
Подробнее в уроках:

Безопасность Kafka

SASL (Simple Authentication and Security Layer)

SASL (Simple Authentication and Security Layer)
Термин

Фреймворк аутентификации, используемый Kafka для проверки подлинности клиентов и брокеров. Kafka поддерживает несколько механизмов SASL: PLAIN (логин/пароль в открытом виде, только поверх TLS), SCRAM-SHA-256 и SCRAM-SHA-512 (безопасный хеш с защитой от перехвата), OAUTHBEARER (JWT-токены), GSSAPI/Kerberos (корпоративные среды с Active Directory). Выбор механизма зависит от инфраструктуры безопасности организации.

Пример:
# Конфигурация брокера для SASL/SCRAM
listeners=SASL_SSL://0.0.0.0:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

# Создание пользователя
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --add-config 'SCRAM-SHA-512=[iterations=8192,password=secret]' \
  --entity-type users --entity-name alice

# Конфигурация клиента (JAAS)
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="alice" password="secret";
Подробнее в уроках:

SSL/TLS Mutual Authentication

SSL/TLS Mutual Authentication
Термин

Двусторонняя проверка подлинности, при которой и брокер, и клиент предоставляют друг другу сертификаты X.509. Клиент проверяет сертификат брокера через truststore, брокер проверяет сертификат клиента (если ssl.client.auth=required). Обеспечивает шифрование трафика и аутентификацию без дополнительного механизма SASL. Управление жизненным циклом сертификатов — главная операционная сложность.

Пример:
# Конфигурация брокера для mTLS
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystorepass
ssl.key.password=keypass
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststorepass
ssl.client.auth=required

# Конфигурация клиента
ssl.keystore.location=/etc/kafka/ssl/client.keystore.jks
ssl.truststore.location=/etc/kafka/ssl/client.truststore.jks
security.protocol=SSL
Подробнее в уроках:

ACL (Access Control List)

ACL (Access Control List)
Термин

Механизм авторизации Kafka, определяющий права доступа конкретного principal (пользователя или сервиса) к ресурсу (топику, группе, кластеру) для определённой операции (Read, Write, Create, Describe, Delete). ACL проверяется после успешной аутентификации. Хранятся в ZooKeeper или (в KRaft-режиме) в метаданных кластера. Поддерживается ALLOW и DENY правила.

Пример:
# Предоставить alice право записи в топик orders
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add \
  --allow-principal User:alice \
  --operation Write \
  --topic orders

# Предоставить group order-processor право чтения
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add \
  --allow-principal User:service-account \
  --operation Read \
  --group order-processor

# Просмотр всех ACL
kafka-acls.sh --list --bootstrap-server localhost:9092
Подробнее в уроках:

Квоты (Quotas)

Quotas
Термин

Механизм ограничения ресурсов, потребляемых клиентами Kafka, для защиты от шумных соседей. Kafka поддерживает три типа квот: producer_byte_rate (байт/с для продюсеров), consumer_byte_rate (байт/с для потребителей), request_percentage (процент сетевых и I/O потоков). При превышении квоты брокер добавляет задержку в ответы клиента (throttling), не обрывая соединение.

Пример:
# Установка квоты для конкретного пользователя
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --add-config 'producer_byte_rate=1048576,consumer_byte_rate=2097152' \
  --entity-type users \
  --entity-name alice

# Квота по умолчанию для всех клиентов
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --add-config 'producer_byte_rate=10485760' \
  --entity-type users \
  --entity-default
Подробнее в уроках:

Audit Logging (аудит)

Audit Logging
Термин

Регистрация событий авторизации и операций с кластером для целей безопасности и соответствия требованиям (GDPR, PCI DSS, SOC2). В Kafka с AclAuthorizer все события ALLOW/DENY записываются через log4j в отдельный лог-файл. Kafka Enterprise-решения предлагают централизованный аудит с маршрутизацией событий в SIEM-системы. Audit-лог должен быть защищён от изменений и храниться отдельно.

Пример:
# log4j.properties — включение аудит-лога
log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=/var/log/kafka/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
Подробнее в уроках:

Production Operations

JMX (Java Management Extensions)

JMX (Java Management Extensions)
Термин

Стандартный механизм Java для экспозиции метрик брокера через MBeans. Kafka публикует сотни метрик в пространстве имён kafka.server:type=..., kafka.network:type=..., kafka.log:type=.... Метрики доступны через JMX Exporter (Prometheus), Kafka Exporter или JConsole. Ключевые группы: BrokerTopicMetrics (throughput), ReplicaManager (ISR), NetworkProcessor (latency).

Пример:
# Примеры критических JMX метрик
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
kafka.server:type=ReplicaManager,name=OfflinePartitionsCount
kafka.controller:type=KafkaController,name=ActiveControllerCount
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce

# Запуск JMX Exporter для Prometheus
# -javaagent:jmx_prometheus_javaagent.jar=7071:kafka-jmx-config.yml
Подробнее в уроках:

Consumer Lag (лаг потребителя)

Consumer Lag
Термин

Разница между Log End Offset (LEO) лидера партиции и последним зафиксированным смещением группы потребителей. Лаг показывает, сколько записей группа ещё не обработала. Высокий и растущий лаг сигнализирует о том, что потребители не справляются с нагрузкой. Мониторинг лага — ключевая метрика SLA для потоковых систем. Инструменты: kafka-consumer-groups.sh, Burrow, Kafka Lag Exporter.

Пример:
# Просмотр лага всех групп потребителей
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group order-processor
# GROUP           TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-processor orders   0          1250            1300            50
# order-processor orders   1          980             980             0
# order-processor orders   2          700             750             50
# Общий лаг: 100 записей
Подробнее в уроках:

Сжатие/расширение ISR

ISR Shrink/Expand
Термин

Динамическое изменение множества In-Sync Replicas в ответ на состояние репликации. ISR сжимается, когда реплика не отправляла Fetch-запрос лидеру дольше replica.lag.time.max.ms. ISR расширяется, когда отставшая реплика догнала лидера и несколько секунд остаётся синхронизированной. Частые изменения ISR (ISR thrashing) создают нагрузку на контроллер и сигнализируют о проблемах с производительностью реплик.

Пример:
# Метрики ISR для мониторинга
kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions

# Параметры, влияющие на ISR
replica.lag.time.max.ms=30000  # реплика выбывает через 30 сек
replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500

# Алерт: UnderReplicatedPartitions > 0 дольше 5 минут
Подробнее в уроках:

Перераспределение партиций

Partition Reassignment
Термин

Операция переноса лидерства или реплик партиций между брокерами для балансировки нагрузки или замены оборудования. Выполняется через kafka-reassign-partitions.sh или автоматически через Cruise Control. Во время переноса происходит межброкерская репликация, которую необходимо ограничивать (throttling) для предотвращения влияния на производительность клиентов.

Пример:
# Шаг 1: Сгенерировать план перераспределения
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --generate \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3,4" > reassignment.json

# Шаг 2: Применить с троттлингом
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --execute \
  --reassignment-json-file reassignment.json \
  --throttle 50000000  # 50 MB/s максимум
Подробнее в уроках:

Prometheus + Grafana

Prometheus + Grafana
Термин

Стандартный стек мониторинга Kafka в production. JMX Prometheus Exporter или Kafka Exporter (для consumer lag) собирают метрики и предоставляют их по HTTP. Prometheus скрапирует метрики каждые 15-60 секунд. Grafana визуализирует данные в дашбордах. Готовые дашборды: Kafka Overview (Grafana ID 7589), Consumer Lag Exporter (ID 12483). Alertmanager отправляет уведомления при нарушении пороговых значений.

Пример:
# docker-compose.yml фрагмент
services:
  jmx-exporter:
    image: bitnami/jmx-exporter
    command: ["5556", "/etc/config/kafka.yml"]
    volumes:
      - ./kafka-jmx-config.yml:/etc/config/kafka.yml
  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  grafana:
    image: grafana/grafana
    # Импорт дашборда: grafana.com/dashboards/7589
Подробнее в уроках:

Capacity Planning (планирование ёмкости)

Capacity Planning
Термин

Методика расчёта необходимых ресурсов кластера Kafka на основе ожидаемой нагрузки. Ключевые параметры: пропускная способность (MB/s записи = throughput × replication_factor), объём хранения (throughput × retention × replication_factor), сетевая пропускная способность (запись + чтение × количество групп потребителей). Правило большого пальца: 80% утилизация дискового I/O и сети как максимум.

Пример:
# Формулы расчёта
# Требуемый дисковый I/O на запись:
#   write_throughput * replication_factor
#   Пример: 100 MB/s * 3 = 300 MB/s

# Требуемое хранилище:
#   write_throughput * retention_hours * 3600 * replication_factor
#   Пример: 100 MB/s * 168h * 3600 * 3 = ~181 TB

# Сетевой трафик на брокер (лидер партиции):
#   write + read * num_consumer_groups
#   = 100 + 100 * 3 = 400 MB/s исходящих
Подробнее в уроках:

Продвинутые паттерны

MirrorMaker 2

MirrorMaker 2
Термин

Инструмент для репликации данных между кластерами Kafka, построенный на базе Kafka Connect. Поддерживает топологии active-passive (односторонняя репликация для DR) и active-active (двусторонняя для геораспределённых сред). MirrorMaker 2 сохраняет смещения потребителей через трансляцию (offset translation), позволяя группам переключаться между кластерами без потери позиции. Автоматически создаёт топики с префиксом источника.

Пример:
# mm2.properties — конфигурация репликации active-passive
clusters=primary, secondary
primary.bootstrap.servers=kafka-primary:9092
secondary.bootstrap.servers=kafka-secondary:9092

# Репликация из primary в secondary
primary->secondary.enabled=true
primary->secondary.topics=orders.*,payments.*

# Синхронизация смещений потребителей
primary->secondary.sync.group.offsets.enabled=true
primary->secondary.emit.checkpoints.interval.seconds=10

# Запуск
connect-mirror-maker.sh mm2.properties
Подробнее в уроках:

RPO/RTO

RPO/RTO (Recovery Point Objective / Recovery Time Objective)
Термин

Ключевые метрики для проектирования аварийного восстановления (DR) Kafka-кластеров. RPO (Recovery Point Objective) — максимально допустимый объём потерянных данных, выраженный во времени: сколько данных можно потерять при катастрофическом сбое. RTO (Recovery Time Objective) — максимально допустимое время восстановления сервиса. Для Kafka: RPO определяется задержкой репликации MirrorMaker 2, RTO — временем переключения клиентов на резервный кластер.

Пример:
# Пример анализа RPO/RTO для Kafka DR

# Сценарий: active-passive с MirrorMaker 2
# Задержка репликации: ~5 секунд
# RPO = 5 секунд (данные за 5 сек могут быть потеряны)

# Процедура переключения:
# 1. Обнаружение сбоя primary:      ~30 сек
# 2. Переключение DNS/LB:             ~2 мин
# 3. Перезапуск клиентов:             ~5 мин
# RTO = ~8 минут

# Для RPO=0: синхронная репликация
# (недоступна в стандартной Kafka, требует Kafka Bridge)
Подробнее в уроках:

Event Sourcing (событийное источниковедение)

Event Sourcing
Термин

Архитектурный паттерн, при котором состояние системы определяется не текущими значениями в базе данных, а как результат воспроизведения последовательности событий. Kafka выступает журналом событий: каждое изменение состояния публикуется как неизменяемое событие. Текущее состояние восстанавливается воспроизведением событий от начала лога (или от снимка). Обеспечивает полный аудит, возможность временного перемотки и воспроизводимость.

Пример:
// Паттерн Event Sourcing с Kafka
// Команды -> события -> проекции

// Событие (неизменяемое)
public record OrderPlaced(
    String orderId,
    String userId,
    List<OrderItem> items,
    double totalAmount,
    Instant timestamp
) {}

// Публикация события
producer.send(new ProducerRecord<>(
    "order-events",
    orderId,           // ключ для партиционирования
    new OrderPlaced(orderId, userId, items, total, Instant.now())
));

// Проекция: восстановление состояния через Kafka Streams KTable
Подробнее в уроках:

CQRS (Command Query Responsibility Segregation)

CQRS (Command Query Responsibility Segregation)
Термин

Паттерн разделения команд (изменений состояния) и запросов (чтений) на разные модели. В связке с Kafka: команды публикуются как события в топик, потребители проецируют события в оптимизированные для чтения хранилища (Elasticsearch для поиска, PostgreSQL для транзакций, Redis для кеша). Kafka Streams или ksqlDB создают и поддерживают материализованные представления в актуальном состоянии.

Пример:
// CQRS с Kafka
// Write side: команды -> события
kafka-topic: order-events (append-only event log)

// Read side: несколько проекций для разных сценариев
// 1. PostgreSQL: текущий статус заказа (по ID)
// 2. Elasticsearch: поиск заказов по атрибутам
// 3. Redis: корзина пользователя (TTL)
// 4. KTable в Kafka Streams: агрегаты для дашборда

// Connect Sink обновляет все проекции
// асинхронно из топика order-events
Подробнее в уроках:

Transactional Outbox (транзакционный ящик исходящих)

Transactional Outbox
Термин

Паттерн надёжной публикации событий без распределённых транзакций между базой данных и Kafka. Приложение в одной локальной транзакции записывает изменение данных и событие в таблицу outbox той же базы данных. Отдельный процесс (или Debezium CDC) читает таблицу outbox и публикует события в Kafka. Гарантирует, что событие будет опубликовано тогда и только тогда, когда транзакция в БД зафиксирована.

Пример:
-- Схема outbox таблицы
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    topic VARCHAR NOT NULL,
    key VARCHAR,
    payload JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    sent_at TIMESTAMP  -- NULL = ещё не отправлено
);

-- В одной транзакции с бизнес-логикой
BEGIN;
  UPDATE orders SET status = 'CONFIRMED' WHERE id = $1;
  INSERT INTO outbox (topic, key, payload)
    VALUES ('order-events', $1, '{"event": "OrderConfirmed", ...}');
COMMIT;
Подробнее в уроках:

Saga Pattern (паттерн Saga)

Saga Pattern
Термин

Паттерн управления распределёнными транзакциями через последовательность локальных транзакций, координируемых через события. При сбое на любом шаге выполняются компенсирующие транзакции в обратном порядке. Хореография (choreography): сервисы реагируют на события друг друга. Оркестрация (orchestration): центральный координатор управляет порядком шагов. Kafka служит шиной событий между сервисами Saga.

Пример:
// Saga хореография: оформление заказа
// 1. OrderService: OrderPlaced -> orders-events
// 2. PaymentService: слушает orders-events,
//    резервирует оплату -> PaymentReserved
// 3. InventoryService: слушает PaymentReserved,
//    резервирует товар -> InventoryReserved
// 4. ShippingService: создаёт доставку -> OrderCompleted

// Компенсация при ошибке на шаге 3:
// InventoryService: InventoryFailed ->
// PaymentService: отменяет резервирование
Подробнее в уроках:

Topic Governance (управление топиками)

Topic Governance
Термин

Набор политик, соглашений и процессов для управления жизненным циклом топиков в масштабируемой Kafka-среде. Включает: соглашения об именовании (домен.сущность.событие.версия), управление схемами через Schema Registry, политики доступа через ACL, определение владельцев топиков, политики хранения и архивирования, процессы создания и удаления топиков. Предотвращает неконтролируемый рост числа топиков.

Пример:
# Соглашение об именовании топиков
# формат: {domain}.{entity}.{event}.{version}

# Примеры:
ordersvc.order.placed.v1
ordersvc.order.shipped.v1
payment.transaction.completed.v2
analytics.user.session.ended.v1

# Метаданные топика (через Custom Config)
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --topic ordersvc.order.placed.v1 \
  --add-config 'owner=orders-team,\
  [email protected],\
  sla-tier=gold'
Подробнее в уроках: