Learning Platform
Глоссарий Troubleshooting
Урок 18.02 · 26 мин
Средний
dockerdata-engineeringkafkacomposestreaming

Kafka в compose: KRaft mode + Kafka UI

Apache Kafka — это распределённый log-store, в который продюсеры пишут сообщения, а консьюмеры их читают. Для DE это hot path streaming-данных: события приходят из приложения, складываются в Kafka topic, оттуда читаются Spark Streaming / Flink / ClickHouse Kafka Engine.

Локально поднимать Kafka раньше было больно: нужен был Zookeeper (отдельный сервис), правильная конфигурация listeners, и куча параметров вроде KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1. С версии Kafka 3.3 (Released 2022) появился KRaft mode — Kafka умеет работать без Zookeeper, координируясь через собственный Raft-протокол. Это упростило локальные стенды в разы.

В этом уроке поднимем минимальный Kafka-стенд: один broker в KRaft + Kafka UI для просмотра topics и сообщений.


Что такое KRaft и почему он важен

Kafka до версии 3.3 требовала Zookeeper — отдельный кластер для хранения metadata (список topics, partitions, ISR, конфиги). Это означало:

  • Два кластера на проде вместо одного.
  • Отдельный сервис в compose (zookeeper:) с правильными портами и volumes.
  • Дополнительные сетевые задержки на metadata-операциях.

KRaft mode (Kafka Raft) встроил metadata-хранилище в саму Kafka. Один и тот же broker может быть и data plane (хранить сообщения), и control plane (Raft-консенсус metadata). Никаких Zookeeper-контейнеров — на 30% меньше YAML и одна точка failure меньше.

Kafka раньше vs KRaft: метаданные внутри broker'а
OLD: Kafka + Zookeeper2 сервисаZookeeper хранил список topics, ACL, broker IDs. Если ZK падал — Kafka не могла обрабатывать metadata-операции. Сложная конфигурация zookeeper.connect.
Kafka 3.3+
NEW: KRaft mode1 сервисBroker сам ведёт Raft-лог metadata. В compose -- один сервис вместо двух. Production-ready начиная с Kafka 3.5.
Combined nodeCombined mode: один процесс выполняет роли controller (metadata) и broker (data). Идеально для dev/single-node.
Multi-node prodProduction: 3+ controller-нод и N broker-нод. Controllers держат Raft-кворум metadata.

Лог-сегменты — как Kafka хранит сообщения на диске

Listeners: внутренний и внешний доступ

Самая хитрая часть Kafka — listeners. Kafka clients сначала подключаются к broker’у, запрашивают metadata (где находятся партиции), и потом подключаются к нужным broker’ам по адресам, которые им вернул сам broker. Это значит, что broker должен анонсировать правильный адрес.

В compose-сети у нас два мира:

  • Внутри сети: другие сервисы (Spark, ClickHouse, kafka-ui) обращаются к broker’у по DNS-имени kafka:9092.
  • Снаружи сети: наш Python-скрипт с хоста (docker-compose exec не считается — он внутри сети) подключается по localhost:29092.

Чтобы оба мира работали, настраиваем два listener’а:

KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER

Разбор:

  • LISTENERS — какие порты broker слушает внутри контейнера. PLAINTEXT на 9092 (для внутренних клиентов), CONTROLLER на 9093 (для Raft), EXTERNAL на 29092 (для хоста).
  • ADVERTISED_LISTENERS — какие адреса broker возвращает клиентам. Внутренний kafka:9092 для compose-сети, localhost:29092 для хоста.
  • LISTENER_SECURITY_PROTOCOL_MAP — какой протокол на каждом listener’е (для dev — везде PLAINTEXT, в проде — SSL/SASL).
  • INTER_BROKER_LISTENER_NAME — какой listener brokers используют для общения между собой (актуально при multi-broker).
  • CONTROLLER_LISTENER_NAMES — какой listener для Raft control plane.
WARNING

ВНИМАНИЕ: если в ADVERTISED_LISTENERS указать только localhost:29092, то другие сервисы в compose-сети (Spark, ClickHouse) попробуют подключиться к localhost внутри СВОЕГО контейнера и получат connection refused. Всегда два listener’а: один с DNS-именем сервиса для compose, второй с localhost для хоста.


Compose-файл: Kafka + Kafka UI

services:
  kafka:
    image: bitnami/kafka:3.8
    container_name: kafka
    ports:
      - "29092:29092"
    environment:
      # KRaft mode
      KAFKA_CFG_NODE_ID: 1
      KAFKA_CFG_PROCESS_ROLES: controller,broker
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      # Listeners
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      # Single-node settings
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
      # Auto-create topics (для dev удобно)
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
    volumes:
      - kafka-data:/bitnami/kafka
    healthcheck:
      test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"]
      interval: 10s
      timeout: 5s
      retries: 10
    networks:
      - kafka-net

  kafka-ui:
    image: provectuslabs/kafka-ui:v0.7.2
    container_name: kafka-ui
    ports:
      - "8090:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      DYNAMIC_CONFIG_ENABLED: 'true'
    depends_on:
      kafka:
        condition: service_healthy
    networks:
      - kafka-net

volumes:
  kafka-data:

networks:
  kafka-net:
Архитектура: один broker в KRaft + UI
bitnami/kafka:3.8broker + controllerОдин процесс KRaft в combined-mode. Слушает 9092 (internal), 9093 (controller Raft), 29092 (external для хоста).
kafka-ui :8090GUIВеб-интерфейс. Подключается к kafka:9092 (DNS из compose). Показывает topics, partitions, messages, consumer groups.
host: producerPython-producer на хосте. Подключается к localhost:29092 (EXTERNAL listener).
29092
brokerKafka broker внутри compose. Принимает на 29092, обрабатывает в стандартном log-формате.
9092
internal consumerConsumer внутри compose-сети (например, Spark). Подключается к kafka:9092.

Запуск и базовые операции

# Запуск
docker compose up -d

# Проверка
docker compose ps
# kafka       Up (healthy)
# kafka-ui    Up

# UI
open http://localhost:8090

Создание topic

Через kafka-topics.sh внутри контейнера:

docker compose exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create \
  --topic events \
  --partitions 3 \
  --replication-factor 1

# Список topics
docker compose exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --list

# Описание topic
docker compose exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --topic events

В Kafka UI создание topic тоже доступно — Topics -> Add a Topic. Но для воспроизводимости лучше держать в shell-скриптах или Terraform.

Producer / Consumer пример

Самый быстрый способ проверить, что Kafka работает — это два терминала:

Терминал 1 — producer:

docker compose exec kafka kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic events
# Введи строки, Enter -- они уйдут в topic:
# hello kafka
# from junior DE

Терминал 2 — consumer:

docker compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic events \
  --from-beginning
# hello kafka
# from junior DE

--from-beginning — читать с offset 0. Без флага consumer стартует с latest и видит только новые сообщения.


Python producer/consumer (confluent-kafka)

Реальный DE будет писать в Kafka не через kafka-console-producer, а из своего кода. Минимальный пример на confluent-kafka (самая быстрая Python-библиотека, обёртка вокруг librdkafka):

# requirements.txt: confluent-kafka==2.5.3
from confluent_kafka import Producer
import json
import time

producer = Producer({"bootstrap.servers": "localhost:29092"})

def delivery_report(err, msg):
    if err is not None:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()} partition {msg.partition()} offset {msg.offset()}")

for i in range(10):
    event = {"user_id": i, "action": "click", "ts": int(time.time())}
    producer.produce(
        "events",
        key=str(i),
        value=json.dumps(event).encode("utf-8"),
        callback=delivery_report,
    )
    producer.poll(0)

producer.flush()

Обрати внимание: localhost:29092 — мы стучим с хоста, поэтому используем EXTERNAL listener. Если бы скрипт сам был контейнером в compose-сети — было бы kafka:9092.

Consumer:

from confluent_kafka import Consumer
import json

consumer = Consumer({
    "bootstrap.servers": "localhost:29092",
    "group.id": "junior-de-consumer",
    "auto.offset.reset": "earliest",
})
consumer.subscribe(["events"])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        event = json.loads(msg.value())
        print(f"Got event: {event}")
finally:
    consumer.close()

Kafka UI: что там смотреть

provectuslabs/kafka-ui — open-source GUI. Что видно через UI:

  • Topics — список, partitions, message count, retention.
  • Messages — открыть topic, увидеть последние N сообщений. Удобно для debug.
  • Consumers — список consumer groups, их offsets, lag.
  • Brokers — broker IDs, leader/follower partitions.

В dev это незаменимый инструмент: после каждого kafka-console-producer можно зайти в UI и увидеть конкретное сообщение с его key, value, headers, partition, offset.

TIP

Redpanda Console (docker.redpanda.com/redpandadata/console:latest) — альтернатива Kafka UI с более полированным UX. Подключение к Apache Kafka работает идентично, конфиг попроще. Если provectuslabs не нравится — попробуй её.


Попробуй сам

# 1. Подними стенд
docker compose up -d

# 2. Жди healthy (~20 секунд)
docker compose ps

# 3. Создай topic
docker compose exec kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create --topic events --partitions 3 --replication-factor 1

# 4. Открой UI
open http://localhost:8090
# -> Topics -> events

# 5. Producer в одном терминале:
docker compose exec kafka kafka-console-producer.sh \
  --bootstrap-server localhost:9092 --topic events
# Введи несколько строк, Ctrl+D для выхода

# 6. Consumer в другом терминале:
docker compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic events \
  --from-beginning --max-messages 5

# 7. Посмотри в UI -> Topics -> events -> Messages, увидишь свои сообщения

# 8. Стресс-тест: 10000 сообщений
docker compose exec kafka bash -c \
  "for i in {1..10000}; do echo msg-$i; done | \
   kafka-console-producer.sh --bootstrap-server localhost:9092 --topic events"

# 9. UI покажет рост count и offsets

# 10. Cleanup
docker compose down -v

Что дальше

С работающим Kafka-стендом ты теперь можешь:

  • Подключить Spark Streaming (следующий урок), чтобы читать из events topic и агрегировать.
  • Подключить ClickHouse Kafka Engine, чтобы стримить данные напрямую в OLAP-БД (через урок).
  • Использовать Kafka в Airflow-DAG’ах: producer-task пишет, consumer-task читает.

KRaft + один listener-пара (internal+external) + Kafka UI — это базовый kit, который покроет 90% локальных стендов для junior DE.


Проверка знанийKnowledge check
Ты добавил в compose сервис spark-streaming, который пытается подключиться к Kafka. В spark-job ты используешь bootstrap.servers=localhost:29092 и получаешь Connection refused. Почему и как починить?
ОтветAnswer
Spark внутри compose-сети. localhost:29092 для него -- это его собственный контейнер, а не хост. EXTERNAL listener на 29092 пробрасывается с broker'а только на хост-машину через port mapping. Внутри compose-сети нужно обращаться к Kafka по DNS-имени сервиса: bootstrap.servers=kafka:9092. Это INTERNAL listener (PLAINTEXT://kafka:9092 в ADVERTISED_LISTENERS). Правило: с хоста -> localhost:29092, из compose -> kafka:9092.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое KRaft mode в Apache Kafka и почему он важен для compose-стендов?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5