Kafka в compose: KRaft mode + Kafka UI
Apache Kafka — это распределённый log-store, в который продюсеры пишут сообщения, а консьюмеры их читают. Для DE это hot path streaming-данных: события приходят из приложения, складываются в Kafka topic, оттуда читаются Spark Streaming / Flink / ClickHouse Kafka Engine.
Локально поднимать Kafka раньше было больно: нужен был Zookeeper (отдельный сервис), правильная конфигурация listeners, и куча параметров вроде KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1. С версии Kafka 3.3 (Released 2022) появился KRaft mode — Kafka умеет работать без Zookeeper, координируясь через собственный Raft-протокол. Это упростило локальные стенды в разы.
В этом уроке поднимем минимальный Kafka-стенд: один broker в KRaft + Kafka UI для просмотра topics и сообщений.
Что такое KRaft и почему он важен
Kafka до версии 3.3 требовала Zookeeper — отдельный кластер для хранения metadata (список topics, partitions, ISR, конфиги). Это означало:
- Два кластера на проде вместо одного.
- Отдельный сервис в compose (
zookeeper:) с правильными портами и volumes. - Дополнительные сетевые задержки на metadata-операциях.
KRaft mode (Kafka Raft) встроил metadata-хранилище в саму Kafka. Один и тот же broker может быть и data plane (хранить сообщения), и control plane (Raft-консенсус metadata). Никаких Zookeeper-контейнеров — на 30% меньше YAML и одна точка failure меньше.
Лог-сегменты — как Kafka хранит сообщения на диске
Listeners: внутренний и внешний доступ
Самая хитрая часть Kafka — listeners. Kafka clients сначала подключаются к broker’у, запрашивают metadata (где находятся партиции), и потом подключаются к нужным broker’ам по адресам, которые им вернул сам broker. Это значит, что broker должен анонсировать правильный адрес.
В compose-сети у нас два мира:
- Внутри сети: другие сервисы (Spark, ClickHouse, kafka-ui) обращаются к broker’у по DNS-имени
kafka:9092. - Снаружи сети: наш Python-скрипт с хоста (
docker-compose execне считается — он внутри сети) подключается поlocalhost:29092.
Чтобы оба мира работали, настраиваем два listener’а:
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
Разбор:
LISTENERS— какие порты broker слушает внутри контейнера. PLAINTEXT на 9092 (для внутренних клиентов), CONTROLLER на 9093 (для Raft), EXTERNAL на 29092 (для хоста).ADVERTISED_LISTENERS— какие адреса broker возвращает клиентам. Внутреннийkafka:9092для compose-сети,localhost:29092для хоста.LISTENER_SECURITY_PROTOCOL_MAP— какой протокол на каждом listener’е (для dev — везде PLAINTEXT, в проде — SSL/SASL).INTER_BROKER_LISTENER_NAME— какой listener brokers используют для общения между собой (актуально при multi-broker).CONTROLLER_LISTENER_NAMES— какой listener для Raft control plane.
ВНИМАНИЕ: если в ADVERTISED_LISTENERS указать только localhost:29092, то другие сервисы в compose-сети (Spark, ClickHouse) попробуют подключиться к localhost внутри СВОЕГО контейнера и получат connection refused. Всегда два listener’а: один с DNS-именем сервиса для compose, второй с localhost для хоста.
Compose-файл: Kafka + Kafka UI
services:
kafka:
image: bitnami/kafka:3.8
container_name: kafka
ports:
- "29092:29092"
environment:
# KRaft mode
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
# Listeners
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
# Single-node settings
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
# Auto-create topics (для dev удобно)
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- kafka-data:/bitnami/kafka
healthcheck:
test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"]
interval: 10s
timeout: 5s
retries: 10
networks:
- kafka-net
kafka-ui:
image: provectuslabs/kafka-ui:v0.7.2
container_name: kafka-ui
ports:
- "8090:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
DYNAMIC_CONFIG_ENABLED: 'true'
depends_on:
kafka:
condition: service_healthy
networks:
- kafka-net
volumes:
kafka-data:
networks:
kafka-net:
Запуск и базовые операции
# Запуск
docker compose up -d
# Проверка
docker compose ps
# kafka Up (healthy)
# kafka-ui Up
# UI
open http://localhost:8090
Создание topic
Через kafka-topics.sh внутри контейнера:
docker compose exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic events \
--partitions 3 \
--replication-factor 1
# Список topics
docker compose exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
# Описание topic
docker compose exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic events
В Kafka UI создание topic тоже доступно — Topics -> Add a Topic. Но для воспроизводимости лучше держать в shell-скриптах или Terraform.
Producer / Consumer пример
Самый быстрый способ проверить, что Kafka работает — это два терминала:
Терминал 1 — producer:
docker compose exec kafka kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic events
# Введи строки, Enter -- они уйдут в topic:
# hello kafka
# from junior DE
Терминал 2 — consumer:
docker compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic events \
--from-beginning
# hello kafka
# from junior DE
--from-beginning — читать с offset 0. Без флага consumer стартует с latest и видит только новые сообщения.
Python producer/consumer (confluent-kafka)
Реальный DE будет писать в Kafka не через kafka-console-producer, а из своего кода. Минимальный пример на confluent-kafka (самая быстрая Python-библиотека, обёртка вокруг librdkafka):
# requirements.txt: confluent-kafka==2.5.3
from confluent_kafka import Producer
import json
import time
producer = Producer({"bootstrap.servers": "localhost:29092"})
def delivery_report(err, msg):
if err is not None:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()} partition {msg.partition()} offset {msg.offset()}")
for i in range(10):
event = {"user_id": i, "action": "click", "ts": int(time.time())}
producer.produce(
"events",
key=str(i),
value=json.dumps(event).encode("utf-8"),
callback=delivery_report,
)
producer.poll(0)
producer.flush()
Обрати внимание: localhost:29092 — мы стучим с хоста, поэтому используем EXTERNAL listener. Если бы скрипт сам был контейнером в compose-сети — было бы kafka:9092.
Consumer:
from confluent_kafka import Consumer
import json
consumer = Consumer({
"bootstrap.servers": "localhost:29092",
"group.id": "junior-de-consumer",
"auto.offset.reset": "earliest",
})
consumer.subscribe(["events"])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
event = json.loads(msg.value())
print(f"Got event: {event}")
finally:
consumer.close()
Kafka UI: что там смотреть
provectuslabs/kafka-ui — open-source GUI. Что видно через UI:
- Topics — список, partitions, message count, retention.
- Messages — открыть topic, увидеть последние N сообщений. Удобно для debug.
- Consumers — список consumer groups, их offsets, lag.
- Brokers — broker IDs, leader/follower partitions.
В dev это незаменимый инструмент: после каждого kafka-console-producer можно зайти в UI и увидеть конкретное сообщение с его key, value, headers, partition, offset.
Redpanda Console (docker.redpanda.com/redpandadata/console:latest) — альтернатива Kafka UI с более полированным UX. Подключение к Apache Kafka работает идентично, конфиг попроще. Если provectuslabs не нравится — попробуй её.
Попробуй сам
# 1. Подними стенд
docker compose up -d
# 2. Жди healthy (~20 секунд)
docker compose ps
# 3. Создай topic
docker compose exec kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic events --partitions 3 --replication-factor 1
# 4. Открой UI
open http://localhost:8090
# -> Topics -> events
# 5. Producer в одном терминале:
docker compose exec kafka kafka-console-producer.sh \
--bootstrap-server localhost:9092 --topic events
# Введи несколько строк, Ctrl+D для выхода
# 6. Consumer в другом терминале:
docker compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic events \
--from-beginning --max-messages 5
# 7. Посмотри в UI -> Topics -> events -> Messages, увидишь свои сообщения
# 8. Стресс-тест: 10000 сообщений
docker compose exec kafka bash -c \
"for i in {1..10000}; do echo msg-$i; done | \
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic events"
# 9. UI покажет рост count и offsets
# 10. Cleanup
docker compose down -v
Что дальше
С работающим Kafka-стендом ты теперь можешь:
- Подключить Spark Streaming (следующий урок), чтобы читать из
eventstopic и агрегировать. - Подключить ClickHouse Kafka Engine, чтобы стримить данные напрямую в OLAP-БД (через урок).
- Использовать Kafka в Airflow-DAG’ах: producer-task пишет, consumer-task читает.
KRaft + один listener-пара (internal+external) + Kafka UI — это базовый kit, который покроет 90% локальных стендов для junior DE.