Установка Flink 2.2 и первый job
Хватит абстракций — давайте запустим Flink. К концу этого урока у вас будет работающий локальный Flink 2.2 кластер вместе с Kafka, вы запустите классический WordCount job и научитесь читать главные элементы Web UI: job graph, parallelism, метрики, checkpoints.
Никакой Java-кода писать в этом уроке не будем — это придёт в модуле 03. Сейчас цель — собрать окружение, запустить готовый пример, и научиться читать Web UI. Это даст вам интуицию для всех последующих модулей.
Что нам понадобится
- Docker и docker compose v2 (Docker Desktop, OrbStack или CLI Docker Engine).
- Минимум 8 GB RAM свободной (Flink JobManager + 2 TaskManager + Kafka + ZooKeeper-less Kraft brokers).
- 10 GB свободного дискового пространства.
- Браузер для Flink Web UI (рекомендую Firefox или Chrome).
- Опционально:
kafkactlилиkcatдля проверки топиков из CLI.
Если Docker Desktop потребляет слишком много памяти, рекомендую OrbStack на Mac (значительно эффективнее) или нативный Docker Engine на Linux.
Структура локального окружения
Мы развернём:
JobManager (1)
Flink JobManager (1 экземпляр) — координатор кластера. Принимает submit, планирует задачи, управляет checkpoints. В production — 2 JobManager в HA-режиме, для локальной разработки достаточно 1.TaskManager 1
TaskManager — worker. Выполняет реальную обработку данных. Имеет slots (по умолчанию 2 в нашей конфигурации). 2 TaskManager x 2 slot = 4 параллельных задачи максимум.TaskManager 2
Второй TaskManager — для демонстрации parallelism > 1 и распределения. В production количество TaskManager определяется нагрузкой; на Kubernetes их количеством управляет операторская скейлинг-политика.Kafka (KRaft)
Kafka брокер в KRaft mode (без ZooKeeper). Apache Kafka 3.7+ работает без ZooKeeper; для локальной разработки достаточно 1 брокера. Топики hand-on демо: input-words, output-counts.Это минимальная конфигурация — для production добавляются HA для JobManager, multi-broker Kafka с replication, отдельный state backend storage, и так далее. Но для изучения этого достаточно.
Apache Kafka: distributed commit log — базовый курс по архитектуреШаг 1: docker-compose.yml
Создайте директорию для проекта и в ней файл docker-compose.yml:
services:
jobmanager:
image: flink:2.2.0-scala_2.12-java17
container_name: flink-jobmanager
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.flamegraph.enabled: true
state.checkpoints.dir: file:///tmp/flink-checkpoints
state.savepoints.dir: file:///tmp/flink-savepoints
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
volumes:
- flink-checkpoints:/tmp/flink-checkpoints
- flink-savepoints:/tmp/flink-savepoints
networks:
- flink-net
taskmanager-1:
image: flink:2.2.0-scala_2.12-java17
container_name: flink-tm-1
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 1728m
volumes:
- flink-checkpoints:/tmp/flink-checkpoints
- flink-savepoints:/tmp/flink-savepoints
networks:
- flink-net
taskmanager-2:
image: flink:2.2.0-scala_2.12-java17
container_name: flink-tm-2
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 1728m
volumes:
- flink-checkpoints:/tmp/flink-checkpoints
- flink-savepoints:/tmp/flink-savepoints
networks:
- flink-net
kafka:
image: apache/kafka:3.9.0
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
CLUSTER_ID: flink-course-cluster
networks:
- flink-net
volumes:
flink-checkpoints:
flink-savepoints:
networks:
flink-net:
driver: bridge
Несколько важных моментов:
- flink:2.2.0-scala_2.12-java17 — официальный образ. Scala 2.12 — это только потому, что Scala API ещё компилируется на 2.12; для Java-кода это не имеет значения. Java 17 — минимальная требуемая версия для Flink 2.x.
- taskmanager.numberOfTaskSlots: 2 — каждый TaskManager имеет 2 slot. Два TaskManager = 4 параллельных задачи максимум. Этого достаточно для всех примеров курса до модуля 15.
- rest.flamegraph.enabled: true — включает flame graphs в Web UI (полезный профайлер на лету, недоступен по умолчанию).
- execution.checkpointing.interval: 60s — каждые 60 секунд Flink снимает чекпоинт. В production интервал зависит от tolerance к потере данных при failure.
- apache/kafka:3.9.0 — Kafka в KRaft mode (без ZooKeeper). Один брокер для локальной разработки.
В production НИКОГДА не используйте file:// для checkpoint/savepoint dir — это работает только локально. На K8s используется S3, GCS, Azure Blob, HDFS или CSI volume.
Шаг 2: Запуск стека
В директории с docker-compose.yml:
docker compose up -d
Проверяем, что всё поднялось:
docker compose ps
Ожидаемый вывод:
NAME IMAGE STATUS PORTS
flink-jobmanager flink:2.2.0-scala_2.12-java17 Up 30 seconds 0.0.0.0:8081->8081/tcp
flink-tm-1 flink:2.2.0-scala_2.12-java17 Up 28 seconds
flink-tm-2 flink:2.2.0-scala_2.12-java17 Up 28 seconds
kafka apache/kafka:3.9.0 Up 30 seconds 0.0.0.0:9092->9092/tcp
Откройте Web UI: http://localhost:8081. Должны увидеть Flink Dashboard. В левом меню — пункты Overview, Jobs (Running, Completed), Task Managers, Job Manager.
В разделе Overview должно быть видно:
Available Task Slots: 4
Task Managers: 2
Total Task Slots: 4
Если slots не 4 — TaskManager не зарегистрировались. Проверьте docker compose logs taskmanager-1 на ошибки.
Шаг 3: Создаём топики
Создаём входной и выходной топики для WordCount:
docker exec -it kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic input-words \
--partitions 2 --replication-factor 1
docker exec -it kafka kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic output-counts \
--partitions 2 --replication-factor 1
Проверяем:
docker exec -it kafka kafka-topics.sh \
--bootstrap-server localhost:9092 --list
Должны увидеть input-words и output-counts.
Шаг 4: Первый job — WordCount
Flink идёт с готовыми примерами в образе. Самый простой — WordCount.jar. Запустим его на статическом тексте сначала, чтобы убедиться, что кластер работает:
docker exec -it flink-jobmanager flink run \
/opt/flink/examples/streaming/WordCount.jar \
--output /tmp/wordcount-output
Ожидаемый вывод:
Job has been submitted with JobID a1b2c3d4e5f6...
Program execution finished
Job with JobID a1b2c3d4e5f6... has finished.
Job Runtime: 1234 ms
Это batch-mode (built-in DataStream example). Текст внутри JAR — заранее известная фраза. Цель — убедиться, что submit-pipeline работает.
Идём в Web UI: Jobs / Completed Jobs. Кликаем на наш job. Видим job graph: Source -> FlatMap -> Sum.
Шаг 5: Streaming WordCount с Kafka
Теперь интереснее: запустим streaming WordCount, который читает из Kafka. Flink 2.2 идёт с конвертером “SocketWindowWordCount”, но это TCP socket — нам нужен Kafka.
Создадим собственный WordCount, который читает из топика. Для этого скачаем Kafka connector JAR и положим в lib:
# Скачиваем flink-sql-connector-kafka
docker exec -it flink-jobmanager \
curl -L -o /opt/flink/lib/flink-sql-connector-kafka-3.4.0-1.20.jar \
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.4.0-1.20/flink-sql-connector-kafka-3.4.0-1.20.jar
Перезапускаем кластер, чтобы JAR подхватился:
docker compose restart jobmanager taskmanager-1 taskmanager-2
Теперь используем Flink SQL Client как самый быстрый способ запустить streaming WordCount:
docker exec -it flink-jobmanager /opt/flink/bin/sql-client.sh
В SQL Client создаём source и sink:
CREATE TABLE words_source (
word STRING,
ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'input-words',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'wordcount-demo',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
CREATE TABLE counts_sink (
word STRING,
cnt BIGINT,
PRIMARY KEY (word) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'output-counts',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'csv',
'value.format' = 'csv'
);
INSERT INTO counts_sink
SELECT word, COUNT(*) AS cnt
FROM words_source
GROUP BY word;
После INSERT INTO SQL Client покажет: Job ID: <jobid>. Идём в Web UI, Jobs / Running Jobs — видим запущенный job.
Шаг 6: Отправляем данные
В новом терминале — producer:
docker exec -it kafka kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic input-words
Вводим слова, по одному в строке:
flink
kafka
flink
streaming
flink
kafka
В другом терминале — consumer на output:
docker exec -it kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic output-counts \
--from-beginning
Видим, как агрегаты обновляются:
flink,1
kafka,1
flink,2
streaming,1
flink,3
kafka,2
Поздравляю — это работающий streaming pipeline в Flink. Каждое слово увеличивает счётчик в state, и обновление пишется в выходной топик.
Шаг 7: Обзор Web UI
Откройте Web UI на http://localhost:8081 и кликните на running job. Вот ключевые элементы, на которые стоит обратить внимание:
Самые полезные разделы для повседневной работы:
Job Graph — первое, что вы открываете, когда что-то “не работает”. Сразу видно, какой оператор в FAILED state.
Checkpoints — критично для понимания производительности. Если checkpoints не завершаются — job не сможет восстановиться после failure. Размер state растёт — это сигнал, что у вас memory leak в state.
Backpressure — главный диагностический инструмент для performance. HIGH backpressure на оператор означает, что down-stream не успевает. Решения: увеличить parallelism, профайлить медленный оператор, оптимизировать sink.
TaskManager / Logs — куда вы идёте, когда job упал. Здесь stacktrace.
Шаг 8: Останавливаем job правильно
Не нажимайте Ctrl+C. Правильная остановка с сохранением state:
docker exec -it flink-jobmanager flink list
Это покажет running jobs с их IDs. Скопируйте ID и сделайте savepoint + stop:
docker exec -it flink-jobmanager flink stop \
--savepointPath file:///tmp/flink-savepoints \
<JOB_ID>
Это создаст savepoint и корректно остановит job. После рестарта вы сможете запустить job с этого savepoint и продолжить с того же state.
Шаг 9: Остановка стека
Когда закончили:
# Сохраняем volumes (checkpoints, savepoints)
docker compose down
# Или полная очистка с удалением данных
docker compose down -v
Если вы планируете часто запускать-останавливать стек, не используйте -v — checkpoints на диске пригодятся для экспериментов с восстановлением после failure.
Возможные проблемы
JobManager не стартует, ошибка “Cannot bind to port 8081” — значит у вас уже что-то на 8081. Поменяйте маппинг порта: "8082:8081" и откройте http://localhost:8082.
TaskManager не регистрируются (Available Task Slots: 0) — проверьте jobmanager.rpc.address: jobmanager. Имя должно совпадать с именем сервиса в docker-compose.yml.
Kafka connector “ClassNotFoundException” — забыли скопировать flink-sql-connector-kafka-*.jar в /opt/flink/lib/ ИЛИ забыли рестартовать кластер после копирования.
Out of Memory — увеличьте taskmanager.memory.process.size до 2g или больше. По умолчанию 1728MB — впритык для серьёзных задач.
Job упал с “TimerService is disabled” — забыли указать checkpoint backend. Добавьте в FLINK_PROPERTIES JobManager: state.backend: filesystem.
Попробуй сам
- Запустите стек по инструкции выше.
- Запустите SQL WordCount.
- Отправьте 50 разных слов в
input-words. - Откройте Web UI и найдите ваш job. Зафиксируйте:
- Parallelism оператора GroupAggregate.
- Размер checkpoint (Checkpoints tab).
- Throughput (records/sec) на Source.
- Остановите job через
flink stopс savepoint. Запустите снова с этого savepoint командой:
Заметьте, как state восстановился — счётчики продолжаются с того места, где остановились.flink run -s <savepoint-path> /opt/flink/examples/streaming/WordCount.jar
Это hands-on основа всего курса. Если что-то не получилось — пишите в Telegram-группу с скриншотом ошибки и docker compose logs.