Learning Platform
Глоссарий Troubleshooting
Урок 21.02 · 28 мин
Средний
dockercapstonekafkasparkclickhouseimplementation

Step-by-step реализация

Этот урок — последовательность шагов для постройки capstone. Не “копируй и вставь” — понимай, что и зачем. На каждом этапе debug-чеклист: что должно работать перед переходом к следующему.


Этап 1: Skeleton compose с Kafka + ClickHouse

Старт — два сервиса. Пока без producer, без Spark.

# compose.yml
networks:
  de-net:
    driver: bridge

services:
  kafka:
    image: bitnami/kafka:3.8
    container_name: kafka
    ports:
      - "29092:29092"
    environment:
      KAFKA_CFG_NODE_ID: 1
      KAFKA_CFG_PROCESS_ROLES: controller,broker
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
    volumes:
      - kafka-data:/bitnami/kafka
    healthcheck:
      test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"]
      interval: 10s
      timeout: 5s
      retries: 10
      start_period: 20s
    networks: [de-net]

  clickhouse:
    image: clickhouse/clickhouse-server:24.10
    container_name: clickhouse
    ports:
      - "8123:8123"
      - "9000:9000"
    environment:
      CLICKHOUSE_USER: default
      CLICKHOUSE_PASSWORD: ''
      CLICKHOUSE_DB: analytics
      CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
    ulimits:
      nofile:
        soft: 262144
        hard: 262144
    volumes:
      - ch-data:/var/lib/clickhouse
      - ./clickhouse/init:/docker-entrypoint-initdb.d
    healthcheck:
      test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8123/ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks: [de-net]

volumes:
  kafka-data:
  ch-data:

Schema:

-- clickhouse/init/01-schema.sql
CREATE TABLE IF NOT EXISTS analytics.events_minute
(
    minute DateTime,
    event_type LowCardinality(String),
    cnt UInt64,
    unique_users UInt64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(minute)
ORDER BY (minute, event_type);
mkdir -p clickhouse/init
# создай 01-schema.sql выше

docker compose up -d
docker compose ps
# kafka и clickhouse должны быть Up (healthy) через ~30 сек

Debug чеклист этап 1:

  • docker compose ps — оба сервиса (healthy).
  • Kafka: docker compose exec kafka kafka-topics.sh --bootstrap-server localhost:9092 --list — возвращает (пустой список или __consumer_offsets).
  • ClickHouse: curl http://localhost:8123/ping — “Ok.”
  • ClickHouse schema: docker compose exec clickhouse clickhouse-client --query "SHOW TABLES FROM analytics" — содержит events_minute.

Если что-то не работает — читай docker compose logs <service> перед тем, как идти дальше.


Этап 2: Producer на Python + Faker

Kafka log segments: как хранятся сообщения на диске

Создай producer/ с Dockerfile и кодом.

# producer/Dockerfile
FROM python:3.11 AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY src/ ./src/
ENV PATH=/root/.local/bin:$PATH \
    PYTHONUNBUFFERED=1
USER nobody
CMD ["python", "-m", "src.producer"]
# producer/requirements.txt
confluent-kafka==2.5.3
faker==30.8.2
# producer/src/__init__.py (пустой)
# producer/src/producer.py
import json
import os
import random
import time
from confluent_kafka import Producer
from faker import Faker

fake = Faker()
EVENT_TYPES = ["click", "view", "purchase", "scroll"]

KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP", "kafka:9092")
TOPIC = os.environ.get("TOPIC", "events")
EVENTS_PER_SECOND = int(os.environ.get("EVENTS_PER_SECOND", "10"))

def delivery_report(err, msg):
    if err is not None:
        print(f"Delivery FAILED: {err}", flush=True)

def main():
    producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP})
    print(f"Connected to {KAFKA_BOOTSTRAP}, producing to {TOPIC} at {EVENTS_PER_SECOND}/sec", flush=True)

    sleep_interval = 1.0 / EVENTS_PER_SECOND
    counter = 0

    while True:
        event = {
            "user_id": random.randint(1, 1000),
            "event_type": random.choice(EVENT_TYPES),
            "event_time": int(time.time()),
            "session_id": fake.uuid4(),
        }
        producer.produce(
            TOPIC,
            key=str(event["user_id"]),
            value=json.dumps(event).encode("utf-8"),
            callback=delivery_report,
        )
        producer.poll(0)
        counter += 1
        if counter % 100 == 0:
            print(f"Produced {counter} events", flush=True)
        time.sleep(sleep_interval)

if __name__ == "__main__":
    main()

Добавь в compose.yml:

services:
  event-producer:
    build: ./producer
    container_name: event-producer
    environment:
      KAFKA_BOOTSTRAP: kafka:9092
      TOPIC: events
      EVENTS_PER_SECOND: 10
    depends_on:
      kafka:
        condition: service_healthy
    restart: unless-stopped
    networks: [de-net]
docker compose build event-producer
docker compose up -d event-producer
docker compose logs -f event-producer
# Должны быть строки:
# Connected to kafka:9092, producing to events at 10/sec
# Produced 100 events
# Produced 200 events

Сейчас сообщения летят в Kafka, но никто не читает. Проверь:

docker compose exec kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic events --max-messages 5
# Должны увидеть JSON

Debug чеклист этап 2:

  • docker images показывает event-producer < 200 MB.
  • Контейнер event-producer Up (не падает в loop).
  • Логи producer’а каждые 10 секунд выдают “Produced N events”.
  • kafka-console-consumer получает сообщения.

Этап 3: Spark streaming job

Spark в compose у нас уже из урока 16-03. Адаптируем:

# в compose.yml
services:
  spark-master:
    image: bitnami/spark:3.5
    container_name: spark-master
    environment:
      SPARK_MODE: master
      SPARK_RPC_AUTHENTICATION_ENABLED: 'no'
      SPARK_SSL_ENABLED: 'no'
    ports:
      - "8080:8080"
      - "7077:7077"
    volumes:
      - ./spark/jobs:/opt/jobs
      - spark-checkpoint:/opt/checkpoints
    networks: [de-net]

  spark-worker:
    image: bitnami/spark:3.5
    environment:
      SPARK_MODE: worker
      SPARK_MASTER_URL: spark://spark-master:7077
      SPARK_WORKER_MEMORY: 2G
      SPARK_WORKER_CORES: 2
    depends_on:
      - spark-master
    volumes:
      - ./spark/jobs:/opt/jobs
      - spark-checkpoint:/opt/checkpoints
    networks: [de-net]
    deploy:
      replicas: 2

volumes:
  spark-checkpoint:

Spark job:

# spark/jobs/streaming_aggregation.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, window, count as cnt_func, approx_count_distinct,
    to_timestamp, from_unixtime
)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType

spark = (
    SparkSession.builder
    .appName("UserEventsStreaming")
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
)

schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("event_type", StringType()),
    StructField("event_time", LongType()),
    StructField("session_id", StringType()),
])

# 1. Read from Kafka
events = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "earliest")
    .load()
)

# 2. Parse JSON
parsed = (
    events
    .selectExpr("CAST(value AS STRING) as json")
    .select(from_json(col("json"), schema).alias("data"))
    .select("data.*")
    .withColumn("event_ts", to_timestamp(from_unixtime(col("event_time"))))
)

# 3. Aggregate by minute window
agg = (
    parsed
    .withWatermark("event_ts", "2 minutes")
    .groupBy(
        window(col("event_ts"), "1 minute"),
        col("event_type"),
    )
    .agg(
        cnt_func("*").alias("cnt"),
        approx_count_distinct("user_id").alias("unique_users"),
    )
    .selectExpr(
        "window.start as minute",
        "event_type",
        "cnt",
        "unique_users",
    )
)

# 4. Write to ClickHouse via JDBC (foreachBatch)
def write_to_clickhouse(batch_df, batch_id):
    print(f"Writing batch {batch_id}, rows: {batch_df.count()}", flush=True)
    (
        batch_df.write
        .format("jdbc")
        .option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
        .option("url", "jdbc:clickhouse://clickhouse:8123/analytics")
        .option("dbtable", "events_minute")
        .option("user", "default")
        .option("password", "")
        .mode("append")
        .save()
    )

query = (
    agg.writeStream
    .outputMode("update")
    .foreachBatch(write_to_clickhouse)
    .option("checkpointLocation", "/opt/checkpoints/events_minute")
    .trigger(processingTime="30 seconds")
    .start()
)

query.awaitTermination()

Submit:

docker compose exec spark-master spark-submit \
  --master spark://spark-master:7077 \
  --total-executor-cores 4 \
  --executor-memory 1G \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,com.clickhouse:clickhouse-jdbc:0.6.5,com.clickhouse:clickhouse-http-client:0.6.5 \
  /opt/jobs/streaming_aggregation.py

При первом запуске Spark скачает JAR’ы (Kafka connector + ClickHouse JDBC). Это 2-3 минуты. Кэшируется в .ivy2 volume.

Debug чеклист этап 3:

  • Spark master UI на http://localhost:8080 — 2 workers ALIVE.
  • После submit — “UserEventsStreaming” в Running applications.
  • В логах spark-submit видишь “Writing batch N, rows: M”.
  • clickhouse-client --query "SELECT count() FROM analytics.events_minute" — растёт каждые ~30 секунд.

Если падает с ClassNotFoundException com.clickhouse.jdbc.ClickHouseDriver — проверь, что --packages правильный. Если падает с Kafka authentication errors — проверь, что kafka.bootstrap.servers=kafka:9092 (internal listener, не localhost).


Этап 4: Grafana с дашбордом

# в compose.yml
services:
  grafana:
    image: grafana/grafana:11.3.1
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
      GF_INSTALL_PLUGINS: grafana-clickhouse-datasource
      GF_USERS_ALLOW_SIGN_UP: 'false'
    volumes:
      - grafana-data:/var/lib/grafana
      - ./grafana/provisioning:/etc/grafana/provisioning:ro
    depends_on:
      clickhouse:
        condition: service_healthy
    networks: [de-net]

volumes:
  grafana-data:

Datasource provisioning:

# grafana/provisioning/datasources/clickhouse.yml
apiVersion: 1
datasources:
  - name: ClickHouse
    type: grafana-clickhouse-datasource
    jsonData:
      server: clickhouse
      port: 9000
      protocol: native
      defaultDatabase: analytics
      username: default
    secureJsonData:
      password: ''
    isDefault: true

Dashboard provisioning config:

# grafana/provisioning/dashboards/dashboards.yml
apiVersion: 1
providers:
  - name: 'default'
    orgId: 1
    folder: ''
    type: file
    options:
      path: /etc/grafana/provisioning/dashboards

Dashboard JSON (минимальный):

# grafana/provisioning/dashboards/events-dashboard.json
{
  "title": "User Events",
  "uid": "user-events",
  "schemaVersion": 38,
  "version": 1,
  "panels": [
    {
      "id": 1,
      "title": "Events per minute",
      "type": "timeseries",
      "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0},
      "datasource": {"type": "grafana-clickhouse-datasource", "uid": "clickhouse"},
      "targets": [{
        "rawSql": "SELECT minute as time, event_type, sum(cnt) as count FROM analytics.events_minute WHERE $__timeFilter(minute) GROUP BY minute, event_type ORDER BY minute",
        "format": "time_series"
      }]
    },
    {
      "id": 2,
      "title": "Unique users per minute",
      "type": "timeseries",
      "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0},
      "datasource": {"type": "grafana-clickhouse-datasource", "uid": "clickhouse"},
      "targets": [{
        "rawSql": "SELECT minute as time, event_type, sum(unique_users) as users FROM analytics.events_minute WHERE $__timeFilter(minute) GROUP BY minute, event_type ORDER BY minute"
      }]
    }
  ]
}
docker compose up -d grafana

open http://localhost:3000   # admin/admin
# Dashboards -> User Events -- должны видеть данные за последние 15 минут

Debug чеклист этап 4:

  • Grafana доступна на http://localhost:3000.
  • Datasource “ClickHouse” не в красном (Configuration -> Datasources).
  • Дашборд “User Events” импортирован автоматически.
  • Графики показывают данные (если producer работает >2 минуты).

Этап 5: финальные штрихи

.env и .env.example

# .env
CLICKHOUSE_PASSWORD=
GRAFANA_ADMIN_PASSWORD=admin
EVENTS_PER_SECOND=10
# в compose.yml
event-producer:
  environment:
    EVENTS_PER_SECOND: ${EVENTS_PER_SECOND:-10}

grafana:
  environment:
    GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_ADMIN_PASSWORD:-admin}

Makefile

.PHONY: up down logs ps reset

up:
	docker compose up -d --wait

down:
	docker compose down

down-v:
	docker compose down -v

logs:
	docker compose logs -f

ps:
	docker compose ps

submit-spark:
	docker compose exec -d spark-master spark-submit \
		--master spark://spark-master:7077 \
		--total-executor-cores 4 \
		--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,com.clickhouse:clickhouse-jdbc:0.6.5,com.clickhouse:clickhouse-http-client:0.6.5 \
		/opt/jobs/streaming_aggregation.py

reset: down-v up
	sleep 30
	$(MAKE) submit-spark

dashboard:
	open http://localhost:3000

verify:
	@docker compose exec -T clickhouse clickhouse-client \
		--query "SELECT count(), min(minute), max(minute) FROM analytics.events_minute"

Полный поток запуска

# 1. clone репо

# 2. config
cp .env.example .env

# 3. build
docker compose build

# 4. up
make up

# 5. wait for healthy
sleep 30

# 6. submit Spark job
make submit-spark

# 7. wait for aggregates
sleep 90

# 8. verify
make verify
# 5  2026-05-15 14:23:00  2026-05-15 14:27:00

# 9. dashboard
make dashboard

Типичные ошибки и debug

1. event-producer падает с “Connection refused”: kafka не healthy ещё. Добавь depends_on: kafka: condition: service_healthy.

2. spark-submit падает с “ClassNotFoundException kafka”: забыл --packages spark-sql-kafka-0-10. Версия должна совпадать с Spark (3.5.x).

3. spark пишет в ClickHouse, но events_minute пустая: проверь, что watermark и trigger срабатывают. Если EVENTS_PER_SECOND слишком низкий (<1), может не хватать данных для batch. Trigger=30s — первый batch будет через 30 сек после стартуа.

4. Grafana показывает “No data”: проверь datasource (Configuration). Если plugin grafana-clickhouse-datasource не установился (видишь “Unknown plugin”), docker compose logs grafana покажет ошибку. Иногда нужно явно установить: GF_INSTALL_PLUGINS=grafana-clickhouse-datasource.

5. После docker compose restart Spark теряет offset: значит, checkpoint volume не работает. Проверь: volumes: [spark-checkpoint:/opt/checkpoints] и checkpointLocation=/opt/checkpoints/events_minute.


Что осталось

После этого урока у тебя должен быть работающий стенд. В следующем уроке — детальная валидация: что именно проверить, какие edge cases ловить.


Проверка знанийKnowledge check
Ты запустил spark-submit, в логах spark-submit видишь "Batch 0 processed, rows: 0", потом "Batch 1 processed, rows: 0" и так далее. Producer работает, kafka-console-consumer видит сообщения. Что не так со Spark?
ОтветAnswer
Скорее всего, проблема с offset. Spark по умолчанию startingOffsets=latest -- читает только новые сообщения, появившиеся после старта streaming-job. Если producer работал раньше, и сейчас "тихий момент" (между батчами), Spark ничего не получает. Решения: (1) Установи option("startingOffsets", "earliest") -- читает все сообщения с начала. Подходит для capstone. (2) Или подожди дольше -- producer выдаёт 10/сек, через 30 секунд batch будет 300 сообщений. (3) Проверь, что spark подключается к правильному Kafka URL: kafka:9092 (internal), не localhost:29092 (external). Если localhost:29092 -- spark изнутри compose-сети не сможет подключиться, но при этом ошибки connection refused, не "0 rows".

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. При запуске spark-submit для streaming job, который читает Kafka, какие два --packages обязательно нужны?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4