Step-by-step реализация
Этот урок — последовательность шагов для постройки capstone. Не “копируй и вставь” — понимай, что и зачем. На каждом этапе debug-чеклист: что должно работать перед переходом к следующему.
Этап 1: Skeleton compose с Kafka + ClickHouse
Старт — два сервиса. Пока без producer, без Spark.
# compose.yml
networks:
de-net:
driver: bridge
services:
kafka:
image: bitnami/kafka:3.8
container_name: kafka
ports:
- "29092:29092"
environment:
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- kafka-data:/bitnami/kafka
healthcheck:
test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"]
interval: 10s
timeout: 5s
retries: 10
start_period: 20s
networks: [de-net]
clickhouse:
image: clickhouse/clickhouse-server:24.10
container_name: clickhouse
ports:
- "8123:8123"
- "9000:9000"
environment:
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD: ''
CLICKHOUSE_DB: analytics
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
ulimits:
nofile:
soft: 262144
hard: 262144
volumes:
- ch-data:/var/lib/clickhouse
- ./clickhouse/init:/docker-entrypoint-initdb.d
healthcheck:
test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8123/ping"]
interval: 10s
timeout: 5s
retries: 5
networks: [de-net]
volumes:
kafka-data:
ch-data:
Schema:
-- clickhouse/init/01-schema.sql
CREATE TABLE IF NOT EXISTS analytics.events_minute
(
minute DateTime,
event_type LowCardinality(String),
cnt UInt64,
unique_users UInt64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(minute)
ORDER BY (minute, event_type);
mkdir -p clickhouse/init
# создай 01-schema.sql выше
docker compose up -d
docker compose ps
# kafka и clickhouse должны быть Up (healthy) через ~30 сек
Debug чеклист этап 1:
-
docker compose ps— оба сервиса(healthy). - Kafka:
docker compose exec kafka kafka-topics.sh --bootstrap-server localhost:9092 --list— возвращает (пустой список или __consumer_offsets). - ClickHouse:
curl http://localhost:8123/ping— “Ok.” - ClickHouse schema:
docker compose exec clickhouse clickhouse-client --query "SHOW TABLES FROM analytics"— содержитevents_minute.
Если что-то не работает — читай docker compose logs <service> перед тем, как идти дальше.
Этап 2: Producer на Python + Faker
Kafka log segments: как хранятся сообщения на дискеСоздай producer/ с Dockerfile и кодом.
# producer/Dockerfile
FROM python:3.11 AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY src/ ./src/
ENV PATH=/root/.local/bin:$PATH \
PYTHONUNBUFFERED=1
USER nobody
CMD ["python", "-m", "src.producer"]
# producer/requirements.txt
confluent-kafka==2.5.3
faker==30.8.2
# producer/src/__init__.py (пустой)
# producer/src/producer.py
import json
import os
import random
import time
from confluent_kafka import Producer
from faker import Faker
fake = Faker()
EVENT_TYPES = ["click", "view", "purchase", "scroll"]
KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP", "kafka:9092")
TOPIC = os.environ.get("TOPIC", "events")
EVENTS_PER_SECOND = int(os.environ.get("EVENTS_PER_SECOND", "10"))
def delivery_report(err, msg):
if err is not None:
print(f"Delivery FAILED: {err}", flush=True)
def main():
producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP})
print(f"Connected to {KAFKA_BOOTSTRAP}, producing to {TOPIC} at {EVENTS_PER_SECOND}/sec", flush=True)
sleep_interval = 1.0 / EVENTS_PER_SECOND
counter = 0
while True:
event = {
"user_id": random.randint(1, 1000),
"event_type": random.choice(EVENT_TYPES),
"event_time": int(time.time()),
"session_id": fake.uuid4(),
}
producer.produce(
TOPIC,
key=str(event["user_id"]),
value=json.dumps(event).encode("utf-8"),
callback=delivery_report,
)
producer.poll(0)
counter += 1
if counter % 100 == 0:
print(f"Produced {counter} events", flush=True)
time.sleep(sleep_interval)
if __name__ == "__main__":
main()
Добавь в compose.yml:
services:
event-producer:
build: ./producer
container_name: event-producer
environment:
KAFKA_BOOTSTRAP: kafka:9092
TOPIC: events
EVENTS_PER_SECOND: 10
depends_on:
kafka:
condition: service_healthy
restart: unless-stopped
networks: [de-net]
docker compose build event-producer
docker compose up -d event-producer
docker compose logs -f event-producer
# Должны быть строки:
# Connected to kafka:9092, producing to events at 10/sec
# Produced 100 events
# Produced 200 events
Сейчас сообщения летят в Kafka, но никто не читает. Проверь:
docker compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic events --max-messages 5
# Должны увидеть JSON
Debug чеклист этап 2:
-
docker imagesпоказываетevent-producer< 200 MB. - Контейнер event-producer Up (не падает в loop).
- Логи producer’а каждые 10 секунд выдают “Produced N events”.
- kafka-console-consumer получает сообщения.
Этап 3: Spark streaming job
Spark в compose у нас уже из урока 16-03. Адаптируем:
# в compose.yml
services:
spark-master:
image: bitnami/spark:3.5
container_name: spark-master
environment:
SPARK_MODE: master
SPARK_RPC_AUTHENTICATION_ENABLED: 'no'
SPARK_SSL_ENABLED: 'no'
ports:
- "8080:8080"
- "7077:7077"
volumes:
- ./spark/jobs:/opt/jobs
- spark-checkpoint:/opt/checkpoints
networks: [de-net]
spark-worker:
image: bitnami/spark:3.5
environment:
SPARK_MODE: worker
SPARK_MASTER_URL: spark://spark-master:7077
SPARK_WORKER_MEMORY: 2G
SPARK_WORKER_CORES: 2
depends_on:
- spark-master
volumes:
- ./spark/jobs:/opt/jobs
- spark-checkpoint:/opt/checkpoints
networks: [de-net]
deploy:
replicas: 2
volumes:
spark-checkpoint:
Spark job:
# spark/jobs/streaming_aggregation.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, window, count as cnt_func, approx_count_distinct,
to_timestamp, from_unixtime
)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType
spark = (
SparkSession.builder
.appName("UserEventsStreaming")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
)
schema = StructType([
StructField("user_id", IntegerType()),
StructField("event_type", StringType()),
StructField("event_time", LongType()),
StructField("session_id", StringType()),
])
# 1. Read from Kafka
events = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()
)
# 2. Parse JSON
parsed = (
events
.selectExpr("CAST(value AS STRING) as json")
.select(from_json(col("json"), schema).alias("data"))
.select("data.*")
.withColumn("event_ts", to_timestamp(from_unixtime(col("event_time"))))
)
# 3. Aggregate by minute window
agg = (
parsed
.withWatermark("event_ts", "2 minutes")
.groupBy(
window(col("event_ts"), "1 minute"),
col("event_type"),
)
.agg(
cnt_func("*").alias("cnt"),
approx_count_distinct("user_id").alias("unique_users"),
)
.selectExpr(
"window.start as minute",
"event_type",
"cnt",
"unique_users",
)
)
# 4. Write to ClickHouse via JDBC (foreachBatch)
def write_to_clickhouse(batch_df, batch_id):
print(f"Writing batch {batch_id}, rows: {batch_df.count()}", flush=True)
(
batch_df.write
.format("jdbc")
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
.option("url", "jdbc:clickhouse://clickhouse:8123/analytics")
.option("dbtable", "events_minute")
.option("user", "default")
.option("password", "")
.mode("append")
.save()
)
query = (
agg.writeStream
.outputMode("update")
.foreachBatch(write_to_clickhouse)
.option("checkpointLocation", "/opt/checkpoints/events_minute")
.trigger(processingTime="30 seconds")
.start()
)
query.awaitTermination()
Submit:
docker compose exec spark-master spark-submit \
--master spark://spark-master:7077 \
--total-executor-cores 4 \
--executor-memory 1G \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,com.clickhouse:clickhouse-jdbc:0.6.5,com.clickhouse:clickhouse-http-client:0.6.5 \
/opt/jobs/streaming_aggregation.py
При первом запуске Spark скачает JAR’ы (Kafka connector + ClickHouse JDBC). Это 2-3 минуты. Кэшируется в .ivy2 volume.
Debug чеклист этап 3:
- Spark master UI на http://localhost:8080 — 2 workers ALIVE.
- После submit — “UserEventsStreaming” в Running applications.
- В логах spark-submit видишь “Writing batch N, rows: M”.
-
clickhouse-client --query "SELECT count() FROM analytics.events_minute"— растёт каждые ~30 секунд.
Если падает с ClassNotFoundException com.clickhouse.jdbc.ClickHouseDriver — проверь, что --packages правильный. Если падает с Kafka authentication errors — проверь, что kafka.bootstrap.servers=kafka:9092 (internal listener, не localhost).
Этап 4: Grafana с дашбордом
# в compose.yml
services:
grafana:
image: grafana/grafana:11.3.1
container_name: grafana
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
GF_INSTALL_PLUGINS: grafana-clickhouse-datasource
GF_USERS_ALLOW_SIGN_UP: 'false'
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning:ro
depends_on:
clickhouse:
condition: service_healthy
networks: [de-net]
volumes:
grafana-data:
Datasource provisioning:
# grafana/provisioning/datasources/clickhouse.yml
apiVersion: 1
datasources:
- name: ClickHouse
type: grafana-clickhouse-datasource
jsonData:
server: clickhouse
port: 9000
protocol: native
defaultDatabase: analytics
username: default
secureJsonData:
password: ''
isDefault: true
Dashboard provisioning config:
# grafana/provisioning/dashboards/dashboards.yml
apiVersion: 1
providers:
- name: 'default'
orgId: 1
folder: ''
type: file
options:
path: /etc/grafana/provisioning/dashboards
Dashboard JSON (минимальный):
# grafana/provisioning/dashboards/events-dashboard.json
{
"title": "User Events",
"uid": "user-events",
"schemaVersion": 38,
"version": 1,
"panels": [
{
"id": 1,
"title": "Events per minute",
"type": "timeseries",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0},
"datasource": {"type": "grafana-clickhouse-datasource", "uid": "clickhouse"},
"targets": [{
"rawSql": "SELECT minute as time, event_type, sum(cnt) as count FROM analytics.events_minute WHERE $__timeFilter(minute) GROUP BY minute, event_type ORDER BY minute",
"format": "time_series"
}]
},
{
"id": 2,
"title": "Unique users per minute",
"type": "timeseries",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0},
"datasource": {"type": "grafana-clickhouse-datasource", "uid": "clickhouse"},
"targets": [{
"rawSql": "SELECT minute as time, event_type, sum(unique_users) as users FROM analytics.events_minute WHERE $__timeFilter(minute) GROUP BY minute, event_type ORDER BY minute"
}]
}
]
}
docker compose up -d grafana
open http://localhost:3000 # admin/admin
# Dashboards -> User Events -- должны видеть данные за последние 15 минут
Debug чеклист этап 4:
- Grafana доступна на http://localhost:3000.
- Datasource “ClickHouse” не в красном (Configuration -> Datasources).
- Дашборд “User Events” импортирован автоматически.
- Графики показывают данные (если producer работает >2 минуты).
Этап 5: финальные штрихи
.env и .env.example
# .env
CLICKHOUSE_PASSWORD=
GRAFANA_ADMIN_PASSWORD=admin
EVENTS_PER_SECOND=10
# в compose.yml
event-producer:
environment:
EVENTS_PER_SECOND: ${EVENTS_PER_SECOND:-10}
grafana:
environment:
GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_ADMIN_PASSWORD:-admin}
Makefile
.PHONY: up down logs ps reset
up:
docker compose up -d --wait
down:
docker compose down
down-v:
docker compose down -v
logs:
docker compose logs -f
ps:
docker compose ps
submit-spark:
docker compose exec -d spark-master spark-submit \
--master spark://spark-master:7077 \
--total-executor-cores 4 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,com.clickhouse:clickhouse-jdbc:0.6.5,com.clickhouse:clickhouse-http-client:0.6.5 \
/opt/jobs/streaming_aggregation.py
reset: down-v up
sleep 30
$(MAKE) submit-spark
dashboard:
open http://localhost:3000
verify:
@docker compose exec -T clickhouse clickhouse-client \
--query "SELECT count(), min(minute), max(minute) FROM analytics.events_minute"
Полный поток запуска
# 1. clone репо
# 2. config
cp .env.example .env
# 3. build
docker compose build
# 4. up
make up
# 5. wait for healthy
sleep 30
# 6. submit Spark job
make submit-spark
# 7. wait for aggregates
sleep 90
# 8. verify
make verify
# 5 2026-05-15 14:23:00 2026-05-15 14:27:00
# 9. dashboard
make dashboard
Типичные ошибки и debug
1. event-producer падает с “Connection refused”: kafka не healthy ещё. Добавь depends_on: kafka: condition: service_healthy.
2. spark-submit падает с “ClassNotFoundException kafka”: забыл --packages spark-sql-kafka-0-10. Версия должна совпадать с Spark (3.5.x).
3. spark пишет в ClickHouse, но events_minute пустая: проверь, что watermark и trigger срабатывают. Если EVENTS_PER_SECOND слишком низкий (<1), может не хватать данных для batch. Trigger=30s — первый batch будет через 30 сек после стартуа.
4. Grafana показывает “No data”: проверь datasource (Configuration). Если plugin grafana-clickhouse-datasource не установился (видишь “Unknown plugin”), docker compose logs grafana покажет ошибку. Иногда нужно явно установить: GF_INSTALL_PLUGINS=grafana-clickhouse-datasource.
5. После docker compose restart Spark теряет offset: значит, checkpoint volume не работает. Проверь: volumes: [spark-checkpoint:/opt/checkpoints] и checkpointLocation=/opt/checkpoints/events_minute.
Что осталось
После этого урока у тебя должен быть работающий стенд. В следующем уроке — детальная валидация: что именно проверить, какие edge cases ловить.