Анатомия Flink-job: DAG, операторы, parallelism
Каждый раз, когда вы вызываете env.execute(), Flink собирает ваш код в JobGraph — направленный ацикличный граф операторов — и отправляет его в кластер. Понимание этой модели — мостик между “написал код” и “понимаю, что происходит в Web UI”. Без неё performance tuning и troubleshooting превращаются в гадание.
К концу этого урока вы сможете посмотреть на job graph в Web UI и сразу видеть: где источник данных, где операторы преобразования, где sink, какой parallelism у каждого, и где между ними keyBy/rebalance/forward стрелки.
Что такое job: pipeline из операторов
Flink job — это dataflow program: source(s) -> transformations -> sink(s). Источник производит данные, операторы преобразуют их, sink записывает результат куда-то ещё. Между ними — потоки данных.
Минимальный пример WordCount словами:
Kafka source -> flatMap (split into words) -> keyBy(word) -> sum(count) -> Kafka sink
Это и есть job. У него есть:
- Source — откуда берутся данные. KafkaSource, FileSource, JdbcSource.
- Transformations — что делается с данными. map, filter, flatMap, keyBy, reduce, window.
- Sink — куда пишется результат. KafkaSink, FileSink, JdbcSink.
В Java-коде это выглядит примерно так:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input-words")
.setGroupId("wordcount-job")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> words = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.flatMap((String line, Collector<String> out) -> {
for (String word : line.toLowerCase().split("\\s+")) {
out.collect(word);
}
}).returns(Types.STRING);
DataStream<Tuple2<String, Long>> counts = words
.map(w -> Tuple2.of(w, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(t -> t.f0)
.sum(1);
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-counts")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.build();
counts.map(t -> t.f0 + "," + t.f1).sinkTo(sink);
env.execute("WordCount Job");
env.execute() — момент, когда Flink собирает всё это в JobGraph и отправляет в кластер. Всё, что до этого, — просто построение графа в памяти.
JobGraph: модель выполнения
JobGraph — это DAG (directed acyclic graph) операторов. Вершины — операторы; рёбра — потоки данных между ними.
DAG Scheduler в Spark: похожая концепция, batch-семантикаSource: KafkaSource (parallelism=2)
Source: KafkaSource. Читает из топика input-words. Parallelism в нашем примере = 2 (по числу партиций Kafka, или setParallelism). Каждый subtask читает свой набор партиций.FlatMap: split words (parallelism=2)
FlatMap: разбивает строку на слова. Stateless оператор. Forward partitioning — каждый subtask source отправляет в свой subtask flatMap (нет shuffle).KeyedReduce: sum (parallelism=4)
KeyedReduce: keyBy(word) распределяет события по hash(word) — одинаковое слово всегда попадает в один subtask. Sum обновляет state (count per word). Здесь parallelism может быть другим (например, 4).Sink: KafkaSink (parallelism=2)
Sink: KafkaSink. Пишет в топик output-counts. Parallelism = 2 — соответствует партициям выходного топика для оптимального write.Несколько важных вещей видны на этой диаграмме:
Каждый оператор имеет parallelism — количество параллельных subtasks. По умолчанию parallelism job-уровня (env.setParallelism(2)), но можно переопределить на каждом операторе (stream.keyBy(...).sum(1).setParallelism(4)).
Между операторами разные типы partitioning — forward (1-to-1, без сетевой передачи), hash (keyBy — по хэшу ключа), rebalance (round-robin), broadcast и другие. Это покрывается подробно в модуле 03.
Шаг keyBy критичен — он делает partitioning по hash(key). Каждое слово всегда попадает в один subtask sum, что обеспечивает корректную аггрегацию.
Operator chains: оптимизация
Flink делает оптимизацию: соседние операторы с forward partitioning и одинаковым parallelism склеиваются в operator chain и выполняются в одном thread без сериализации/десериализации между ними.
В нашем примере: Source -> FlatMap -> Map (если все имеют parallelism=2 и нет keyBy/rebalance между ними) — могут быть склеены в один chain. Это значит, что вместо 3 операторов в Web UI вы увидите 1: “Source -> FlatMap -> Map”.
Преимущества chain:
- Нет сериализации данных между операторами в одном chain — данные передаются как Java objects напрямую.
- Нет сетевой передачи — всё в одном JVM.
- Меньше threads — meньше context switching.
Когда chain нарушается:
- keyBy — обязательный shuffle, нельзя chain.
- Разный parallelism соседних операторов — chain нарушается.
- Явное
.disableChaining()или.startNewChain()— programmer override. - Между операторами есть
.rebalance()/.shuffle()/.broadcast()— нарушается.
Когда смотрите job graph в Web UI, видите chained операторы как один блок (хотя в коде это несколько вызовов). Это нормально и хорошо для performance. Иногда полезно нарочно нарушить chain через .disableChaining() — если один из операторов медленный, его проще профайлить отдельно.
ExecutionGraph и subtasks
После того как JobGraph отправлен JobManager’у, он трансформируется в ExecutionGraph — runtime представление с конкретными subtasks.
JobGraph (logical) -> ExecutionGraph (physical) -> subtasks running на TaskManager slots.
Если у оператора parallelism=4, его ExecutionGraph содержит 4 execution vertex (subtasks). Каждый subtask — отдельный thread в TaskManager’е, обрабатывающий свою долю данных.
Распределение subtasks по slots:
- Каждый TaskManager имеет
taskmanager.numberOfTaskSlotsslots (типично 2-8). - Slot — единица выделения ресурсов. Один subtask = занимает один slot (если slot sharing не включён, что по умолчанию ДА).
- Slot sharing: subtasks разных операторов одного job могут делить один slot. Это уменьшает количество TaskManager’ов, нужных для job.
Пример: job с операторами Source/parallelism=2, KeyedReduce/parallelism=4, Sink/parallelism=2. Без slot sharing нужно 2+4+2=8 slots. Со slot sharing — достаточно max(2,4,2)=4 slot. Огромная разница.
Slot sharing включён по умолчанию и рекомендуется для большинства случаев. Disabling — для специфических ситуаций, когда нужно изолировать heavy operator (например, операторы с большим state, которые могут мешать друг другу).
Pipeline parallelism: где быстро, где медленно
Важно понимать: операторы внутри job выполняются ОДНОВРЕМЕННО, не последовательно. Pipeline parallelism.
Когда событие N обрабатывается в KeyedReduce, событие N+1 уже в FlatMap, а событие N+2 ещё в Source. Все они движутся через pipeline одновременно.
Это даёт streaming низкую end-to-end latency: событие “пришло -> ушло в sink” за миллисекунды, потому что не нужно ждать batch.
Но это же означает: самый медленный оператор определяет throughput всего job. Если KeyedReduce обрабатывает 1000 событий/сек, а Source может производить 10000/сек — job будет работать на 1000/сек, и накопится backpressure перед KeyedReduce.
Backpressure — главный диагностический сигнал. Если видите в Web UI HIGH backpressure на каком-то операторе — это узкое место. Решения:
- Увеличить parallelism этого оператора (если он keyed — нужен скейлинг и upstream, чтобы переразбить ключи).
- Оптимизировать код оператора (профайлинг, удалить аллокации, кешировать вычисления).
- Заменить sync операции на async (модуль 09 про Async I/O).
- Изменить сериализацию на более быструю (Avro вместо JSON, POJO вместо Kryo).
Граф пример: реальный CDC pipeline
Чтобы закрепить картину — реальный production CDC pipeline:
Postgres CDC (parallelism=1)
Postgres CDC Source: читает WAL через logical replication. Parallelism = 1 (источник логически single-threaded). Сразу bottleneck, но WAL читается быстро.Filter + Map (parallelism=8)
Filter + Map: фильтрация по типу operation (только INSERT/UPDATE/DELETE для нужных таблиц), нормализация. Rebalance из 1 source в 8 параллельных filter subtasks — round-robin.Dedup + Schema enforce (parallelism=8)
KeyedProcess: дедупликация и schema enforcement по ключу (table_name, primary_key). Каждый ключ в одном subtask, корректная история на ключ.Kafka Sink (parallelism=8)
Kafka Sink: пишет в outbox-топик для downstream consumers (микросервисы). Parallelism = 8 (matches Kafka partitions).Iceberg Sink (parallelism=4)
Iceberg Sink: пишет в Apache Iceberg таблицу через streaming writer. Parallelism = 4 (меньше, чем upstream — Iceberg writes более тяжёлые).Из этой картины сразу видно:
- Source — single-threaded (так устроена Postgres logical replication). Это потенциальное узкое место, нужно мониторить throughput.
- Filter и Dedup — параллельны (8 каждый). Если backpressure тут — увеличиваем parallelism, разбиваем ключ детальнее.
- Sinks разной параллельности — Kafka соответствует партициям, Iceberg меньше (потому что pisat в parquet/orc файлы — операция тяжёлая).
- Rebalance после single-threaded source — критично для распределения нагрузки.
- keyBy перед Dedup — обеспечивает корректность (один ключ в одном state).
Эта картина — типична для production. Когда вы увидите такой граф в Web UI, не пугайтесь — это нормальная структура CDC pipeline.
Запуск job: что происходит
Когда вы вызываете env.execute():
Этот процесс — milliseconds до seconds. Большая часть времени — выделение slots и deployment JAR (особенно для больших JAR с heavy dependencies).
Попробуй сам
Откройте Web UI с running job из урока 00.3 (WordCount):
- Найдите Job Graph. Сколько операторов? Какие имена?
- Кликните на каждый оператор — какой у него parallelism? Какие input/output partitioning стрелки видите?
- Найдите subtasks. Сколько их у каждого оператора? Сколько
records inобработал каждый? - Какие операторы chained? Если видите блок типа “Source -> Map” — это chain. Сколько таких chain в job?
- Поэкспериментируйте: остановите job, измените parallelism через
env.setParallelism(4)или на операторе явно, перезапустите. Увидите новую структуру.
Это упражнение даёт интуицию, которую ничем не заменишь.