Learning Platform
Глоссарий Troubleshooting
Урок 01.03 · 25 мин
Средний
SetupDockerFlink 2.2Web UIWordCount

Установка Flink 2.2 и первый job

Хватит абстракций — давайте запустим Flink. К концу этого урока у вас будет работающий локальный Flink 2.2 кластер вместе с Kafka, вы запустите классический WordCount job и научитесь читать главные элементы Web UI: job graph, parallelism, метрики, checkpoints.

Никакой Java-кода писать в этом уроке не будем — это придёт в модуле 03. Сейчас цель — собрать окружение, запустить готовый пример, и научиться читать Web UI. Это даст вам интуицию для всех последующих модулей.


Что нам понадобится

  • Docker и docker compose v2 (Docker Desktop, OrbStack или CLI Docker Engine).
  • Минимум 8 GB RAM свободной (Flink JobManager + 2 TaskManager + Kafka + ZooKeeper-less Kraft brokers).
  • 10 GB свободного дискового пространства.
  • Браузер для Flink Web UI (рекомендую Firefox или Chrome).
  • Опционально: kafkactl или kcat для проверки топиков из CLI.

Если Docker Desktop потребляет слишком много памяти, рекомендую OrbStack на Mac (значительно эффективнее) или нативный Docker Engine на Linux.


Структура локального окружения

Мы развернём:

Локальный стек: Flink + Kafka

JobManager (1)

Flink JobManager (1 экземпляр) — координатор кластера. Принимает submit, планирует задачи, управляет checkpoints. В production — 2 JobManager в HA-режиме, для локальной разработки достаточно 1.
распределяет задачи

TaskManager 1

TaskManager — worker. Выполняет реальную обработку данных. Имеет slots (по умолчанию 2 в нашей конфигурации). 2 TaskManager x 2 slot = 4 параллельных задачи максимум.

TaskManager 2

Второй TaskManager — для демонстрации parallelism > 1 и распределения. В production количество TaskManager определяется нагрузкой; на Kubernetes их количеством управляет операторская скейлинг-политика.
читают/пишут

Kafka (KRaft)

Kafka брокер в KRaft mode (без ZooKeeper). Apache Kafka 3.7+ работает без ZooKeeper; для локальной разработки достаточно 1 брокера. Топики hand-on демо: input-words, output-counts.

Это минимальная конфигурация — для production добавляются HA для JobManager, multi-broker Kafka с replication, отдельный state backend storage, и так далее. Но для изучения этого достаточно.

Apache Kafka: distributed commit log — базовый курс по архитектуре

Шаг 1: docker-compose.yml

Создайте директорию для проекта и в ней файл docker-compose.yml:

services:
  jobmanager:
    image: flink:2.2.0-scala_2.12-java17
    container_name: flink-jobmanager
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        rest.flamegraph.enabled: true
        state.checkpoints.dir: file:///tmp/flink-checkpoints
        state.savepoints.dir: file:///tmp/flink-savepoints
        execution.checkpointing.interval: 60s
        execution.checkpointing.mode: EXACTLY_ONCE
    volumes:
      - flink-checkpoints:/tmp/flink-checkpoints
      - flink-savepoints:/tmp/flink-savepoints
    networks:
      - flink-net

  taskmanager-1:
    image: flink:2.2.0-scala_2.12-java17
    container_name: flink-tm-1
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        taskmanager.memory.process.size: 1728m
    volumes:
      - flink-checkpoints:/tmp/flink-checkpoints
      - flink-savepoints:/tmp/flink-savepoints
    networks:
      - flink-net

  taskmanager-2:
    image: flink:2.2.0-scala_2.12-java17
    container_name: flink-tm-2
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        taskmanager.memory.process.size: 1728m
    volumes:
      - flink-checkpoints:/tmp/flink-checkpoints
      - flink-savepoints:/tmp/flink-savepoints
    networks:
      - flink-net

  kafka:
    image: apache/kafka:3.9.0
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      CLUSTER_ID: flink-course-cluster
    networks:
      - flink-net

volumes:
  flink-checkpoints:
  flink-savepoints:

networks:
  flink-net:
    driver: bridge

Несколько важных моментов:

  • flink:2.2.0-scala_2.12-java17 — официальный образ. Scala 2.12 — это только потому, что Scala API ещё компилируется на 2.12; для Java-кода это не имеет значения. Java 17 — минимальная требуемая версия для Flink 2.x.
  • taskmanager.numberOfTaskSlots: 2 — каждый TaskManager имеет 2 slot. Два TaskManager = 4 параллельных задачи максимум. Этого достаточно для всех примеров курса до модуля 15.
  • rest.flamegraph.enabled: true — включает flame graphs в Web UI (полезный профайлер на лету, недоступен по умолчанию).
  • execution.checkpointing.interval: 60s — каждые 60 секунд Flink снимает чекпоинт. В production интервал зависит от tolerance к потере данных при failure.
  • apache/kafka:3.9.0 — Kafka в KRaft mode (без ZooKeeper). Один брокер для локальной разработки.
WARNING

В production НИКОГДА не используйте file:// для checkpoint/savepoint dir — это работает только локально. На K8s используется S3, GCS, Azure Blob, HDFS или CSI volume.


Шаг 2: Запуск стека

В директории с docker-compose.yml:

docker compose up -d

Проверяем, что всё поднялось:

docker compose ps

Ожидаемый вывод:

NAME              IMAGE                                STATUS         PORTS
flink-jobmanager  flink:2.2.0-scala_2.12-java17        Up 30 seconds  0.0.0.0:8081->8081/tcp
flink-tm-1        flink:2.2.0-scala_2.12-java17        Up 28 seconds
flink-tm-2        flink:2.2.0-scala_2.12-java17        Up 28 seconds
kafka             apache/kafka:3.9.0                   Up 30 seconds  0.0.0.0:9092->9092/tcp

Откройте Web UI: http://localhost:8081. Должны увидеть Flink Dashboard. В левом меню — пункты Overview, Jobs (Running, Completed), Task Managers, Job Manager.

В разделе Overview должно быть видно:

Available Task Slots: 4
Task Managers: 2
Total Task Slots: 4

Если slots не 4 — TaskManager не зарегистрировались. Проверьте docker compose logs taskmanager-1 на ошибки.


Шаг 3: Создаём топики

Создаём входной и выходной топики для WordCount:

docker exec -it kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create --topic input-words \
  --partitions 2 --replication-factor 1

docker exec -it kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create --topic output-counts \
  --partitions 2 --replication-factor 1

Проверяем:

docker exec -it kafka kafka-topics.sh \
  --bootstrap-server localhost:9092 --list

Должны увидеть input-words и output-counts.


Шаг 4: Первый job — WordCount

Flink идёт с готовыми примерами в образе. Самый простой — WordCount.jar. Запустим его на статическом тексте сначала, чтобы убедиться, что кластер работает:

docker exec -it flink-jobmanager flink run \
  /opt/flink/examples/streaming/WordCount.jar \
  --output /tmp/wordcount-output

Ожидаемый вывод:

Job has been submitted with JobID a1b2c3d4e5f6...
Program execution finished
Job with JobID a1b2c3d4e5f6... has finished.
Job Runtime: 1234 ms

Это batch-mode (built-in DataStream example). Текст внутри JAR — заранее известная фраза. Цель — убедиться, что submit-pipeline работает.

Идём в Web UI: Jobs / Completed Jobs. Кликаем на наш job. Видим job graph: Source -> FlatMap -> Sum.


Шаг 5: Streaming WordCount с Kafka

Теперь интереснее: запустим streaming WordCount, который читает из Kafka. Flink 2.2 идёт с конвертером “SocketWindowWordCount”, но это TCP socket — нам нужен Kafka.

Создадим собственный WordCount, который читает из топика. Для этого скачаем Kafka connector JAR и положим в lib:

# Скачиваем flink-sql-connector-kafka
docker exec -it flink-jobmanager \
  curl -L -o /opt/flink/lib/flink-sql-connector-kafka-3.4.0-1.20.jar \
  https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.4.0-1.20/flink-sql-connector-kafka-3.4.0-1.20.jar

Перезапускаем кластер, чтобы JAR подхватился:

docker compose restart jobmanager taskmanager-1 taskmanager-2

Теперь используем Flink SQL Client как самый быстрый способ запустить streaming WordCount:

docker exec -it flink-jobmanager /opt/flink/bin/sql-client.sh

В SQL Client создаём source и sink:

CREATE TABLE words_source (
  word STRING,
  ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  'topic' = 'input-words',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'wordcount-demo',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'csv'
);

CREATE TABLE counts_sink (
  word STRING,
  cnt BIGINT,
  PRIMARY KEY (word) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'output-counts',
  'properties.bootstrap.servers' = 'kafka:9092',
  'key.format' = 'csv',
  'value.format' = 'csv'
);

INSERT INTO counts_sink
SELECT word, COUNT(*) AS cnt
FROM words_source
GROUP BY word;

После INSERT INTO SQL Client покажет: Job ID: <jobid>. Идём в Web UI, Jobs / Running Jobs — видим запущенный job.


Шаг 6: Отправляем данные

В новом терминале — producer:

docker exec -it kafka kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic input-words

Вводим слова, по одному в строке:

flink
kafka
flink
streaming
flink
kafka

В другом терминале — consumer на output:

docker exec -it kafka kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic output-counts \
  --from-beginning

Видим, как агрегаты обновляются:

flink,1
kafka,1
flink,2
streaming,1
flink,3
kafka,2

Поздравляю — это работающий streaming pipeline в Flink. Каждое слово увеличивает счётчик в state, и обновление пишется в выходной топик.


Шаг 7: Обзор Web UI

Откройте Web UI на http://localhost:8081 и кликните на running job. Вот ключевые элементы, на которые стоит обратить внимание:

Web UI: ключевые разделы
Job GraphВизуализация job как DAG из операторов. Видно Source, оператор GroupBy/Aggregate, Sink. Цвет вершин — состояние (RUNNING/FAILED). Кликабельно для drill-down.
SubtasksКаждый оператор имеет subtasks по числу parallelism. Например, parallelism=2 на оператор -> 2 subtask. Видны bytes read/written, records, backpressure status.
CheckpointsИстория чекпоинтов: completed, failed, in-progress. Размер state, длительность, выравнивание. Если checkpoints падают или занимают минуты — это сигнал проблем.
BackpressureИндикатор backpressure для каждого оператора: OK / LOW / HIGH. Если HIGH — значит down-stream оператор не успевает обрабатывать; up-stream начинает замедляться.
MetricsReal-time метрики: numRecordsIn, numRecordsOut, throughput, latency. Можно добавить графики для custom метрик. Для production — экспорт в Prometheus.
TaskManager LogsПрямой доступ к логам каждого TaskManager. Если job упал — здесь stacktrace. Для production -- лог-aggregation в Loki, ELK, или CloudWatch.

Самые полезные разделы для повседневной работы:

Job Graph — первое, что вы открываете, когда что-то “не работает”. Сразу видно, какой оператор в FAILED state.

Checkpoints — критично для понимания производительности. Если checkpoints не завершаются — job не сможет восстановиться после failure. Размер state растёт — это сигнал, что у вас memory leak в state.

Backpressure — главный диагностический инструмент для performance. HIGH backpressure на оператор означает, что down-stream не успевает. Решения: увеличить parallelism, профайлить медленный оператор, оптимизировать sink.

TaskManager / Logs — куда вы идёте, когда job упал. Здесь stacktrace.


Шаг 8: Останавливаем job правильно

Не нажимайте Ctrl+C. Правильная остановка с сохранением state:

docker exec -it flink-jobmanager flink list

Это покажет running jobs с их IDs. Скопируйте ID и сделайте savepoint + stop:

docker exec -it flink-jobmanager flink stop \
  --savepointPath file:///tmp/flink-savepoints \
  <JOB_ID>

Это создаст savepoint и корректно остановит job. После рестарта вы сможете запустить job с этого savepoint и продолжить с того же state.


Шаг 9: Остановка стека

Когда закончили:

# Сохраняем volumes (checkpoints, savepoints)
docker compose down

# Или полная очистка с удалением данных
docker compose down -v
TIP

Если вы планируете часто запускать-останавливать стек, не используйте -v — checkpoints на диске пригодятся для экспериментов с восстановлением после failure.


Возможные проблемы

JobManager не стартует, ошибка “Cannot bind to port 8081” — значит у вас уже что-то на 8081. Поменяйте маппинг порта: "8082:8081" и откройте http://localhost:8082.

TaskManager не регистрируются (Available Task Slots: 0) — проверьте jobmanager.rpc.address: jobmanager. Имя должно совпадать с именем сервиса в docker-compose.yml.

Kafka connector “ClassNotFoundException” — забыли скопировать flink-sql-connector-kafka-*.jar в /opt/flink/lib/ ИЛИ забыли рестартовать кластер после копирования.

Out of Memory — увеличьте taskmanager.memory.process.size до 2g или больше. По умолчанию 1728MB — впритык для серьёзных задач.

Job упал с “TimerService is disabled” — забыли указать checkpoint backend. Добавьте в FLINK_PROPERTIES JobManager: state.backend: filesystem.


Попробуй сам

  1. Запустите стек по инструкции выше.
  2. Запустите SQL WordCount.
  3. Отправьте 50 разных слов в input-words.
  4. Откройте Web UI и найдите ваш job. Зафиксируйте:
    • Parallelism оператора GroupAggregate.
    • Размер checkpoint (Checkpoints tab).
    • Throughput (records/sec) на Source.
  5. Остановите job через flink stop с savepoint. Запустите снова с этого savepoint командой:
    flink run -s <savepoint-path> /opt/flink/examples/streaming/WordCount.jar
    Заметьте, как state восстановился — счётчики продолжаются с того места, где остановились.

Это hands-on основа всего курса. Если что-то не получилось — пишите в Telegram-группу с скриншотом ошибки и docker compose logs.

Проверка знанийKnowledge check
Почему правильная остановка Flink-job через 'flink stop' с savepoint важна для production, в отличие от kill процесса или Ctrl+C?
ОтветAnswer
Команда 'flink stop --savepointPath ...' выполняет два критичных действия атомарно: (1) триггерит финальный savepoint — снимок всего state job (keyed state, operator state, Kafka offsets, watermarks); (2) корректно завершает источники, дожидается, пока все записи обработаны, и закрывает sinks (важно для two-phase commit). Простой kill процесса оставляет state в неопределённом виде — checkpoints могут быть промежуточные, Kafka transactional producer может остаться в pending состоянии. Без savepoint вы не сможете обновить версию job без потери state, не сможете мигрировать на другой кластер, и не сможете провести rolling upgrade.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Почему 'flink stop --savepointPath ... <JOB_ID>' предпочтительнее, чем kill процесса или Ctrl+C для остановки production job?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3