Learning Platform
Глоссарий Troubleshooting
Урок 02.04 · 17 мин
Средний
Flink JobDAGOperatorsParallelismSubtasks

Анатомия Flink-job: DAG, операторы, parallelism

Каждый раз, когда вы вызываете env.execute(), Flink собирает ваш код в JobGraph — направленный ацикличный граф операторов — и отправляет его в кластер. Понимание этой модели — мостик между “написал код” и “понимаю, что происходит в Web UI”. Без неё performance tuning и troubleshooting превращаются в гадание.

К концу этого урока вы сможете посмотреть на job graph в Web UI и сразу видеть: где источник данных, где операторы преобразования, где sink, какой parallelism у каждого, и где между ними keyBy/rebalance/forward стрелки.


Что такое job: pipeline из операторов

Flink job — это dataflow program: source(s) -> transformations -> sink(s). Источник производит данные, операторы преобразуют их, sink записывает результат куда-то ещё. Между ними — потоки данных.

Минимальный пример WordCount словами:

Kafka source -> flatMap (split into words) -> keyBy(word) -> sum(count) -> Kafka sink

Это и есть job. У него есть:

  • Source — откуда берутся данные. KafkaSource, FileSource, JdbcSource.
  • Transformations — что делается с данными. map, filter, flatMap, keyBy, reduce, window.
  • Sink — куда пишется результат. KafkaSink, FileSink, JdbcSink.

В Java-коде это выглядит примерно так:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("input-words")
    .setGroupId("wordcount-job")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> words = env
    .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
    .flatMap((String line, Collector<String> out) -> {
        for (String word : line.toLowerCase().split("\\s+")) {
            out.collect(word);
        }
    }).returns(Types.STRING);

DataStream<Tuple2<String, Long>> counts = words
    .map(w -> Tuple2.of(w, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG))
    .keyBy(t -> t.f0)
    .sum(1);

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("output-counts")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build())
    .build();

counts.map(t -> t.f0 + "," + t.f1).sinkTo(sink);

env.execute("WordCount Job");

env.execute() — момент, когда Flink собирает всё это в JobGraph и отправляет в кластер. Всё, что до этого, — просто построение графа в памяти.


JobGraph: модель выполнения

JobGraph — это DAG (directed acyclic graph) операторов. Вершины — операторы; рёбра — потоки данных между ними.

DAG Scheduler в Spark: похожая концепция, batch-семантика
WordCount JobGraph

Source: KafkaSource (parallelism=2)

Source: KafkaSource. Читает из топика input-words. Parallelism в нашем примере = 2 (по числу партиций Kafka, или setParallelism). Каждый subtask читает свой набор партиций.
forward (1-to-1)

FlatMap: split words (parallelism=2)

FlatMap: разбивает строку на слова. Stateless оператор. Forward partitioning — каждый subtask source отправляет в свой subtask flatMap (нет shuffle).
hash (keyBy shuffle)

KeyedReduce: sum (parallelism=4)

KeyedReduce: keyBy(word) распределяет события по hash(word) — одинаковое слово всегда попадает в один subtask. Sum обновляет state (count per word). Здесь parallelism может быть другим (например, 4).
forward / hash

Sink: KafkaSink (parallelism=2)

Sink: KafkaSink. Пишет в топик output-counts. Parallelism = 2 — соответствует партициям выходного топика для оптимального write.

Несколько важных вещей видны на этой диаграмме:

Каждый оператор имеет parallelism — количество параллельных subtasks. По умолчанию parallelism job-уровня (env.setParallelism(2)), но можно переопределить на каждом операторе (stream.keyBy(...).sum(1).setParallelism(4)).

Между операторами разные типы partitioning — forward (1-to-1, без сетевой передачи), hash (keyBy — по хэшу ключа), rebalance (round-robin), broadcast и другие. Это покрывается подробно в модуле 03.

Шаг keyBy критичен — он делает partitioning по hash(key). Каждое слово всегда попадает в один subtask sum, что обеспечивает корректную аггрегацию.


Operator chains: оптимизация

Flink делает оптимизацию: соседние операторы с forward partitioning и одинаковым parallelism склеиваются в operator chain и выполняются в одном thread без сериализации/десериализации между ними.

В нашем примере: Source -> FlatMap -> Map (если все имеют parallelism=2 и нет keyBy/rebalance между ними) — могут быть склеены в один chain. Это значит, что вместо 3 операторов в Web UI вы увидите 1: “Source -> FlatMap -> Map”.

Преимущества chain:

  • Нет сериализации данных между операторами в одном chain — данные передаются как Java objects напрямую.
  • Нет сетевой передачи — всё в одном JVM.
  • Меньше threads — meньше context switching.

Когда chain нарушается:

  • keyBy — обязательный shuffle, нельзя chain.
  • Разный parallelism соседних операторов — chain нарушается.
  • Явное .disableChaining() или .startNewChain() — programmer override.
  • Между операторами есть .rebalance() / .shuffle() / .broadcast() — нарушается.

Когда смотрите job graph в Web UI, видите chained операторы как один блок (хотя в коде это несколько вызовов). Это нормально и хорошо для performance. Иногда полезно нарочно нарушить chain через .disableChaining() — если один из операторов медленный, его проще профайлить отдельно.


ExecutionGraph и subtasks

После того как JobGraph отправлен JobManager’у, он трансформируется в ExecutionGraph — runtime представление с конкретными subtasks.

JobGraph (logical) -> ExecutionGraph (physical) -> subtasks running на TaskManager slots.

Если у оператора parallelism=4, его ExecutionGraph содержит 4 execution vertex (subtasks). Каждый subtask — отдельный thread в TaskManager’е, обрабатывающий свою долю данных.

Распределение subtasks по slots:

  • Каждый TaskManager имеет taskmanager.numberOfTaskSlots slots (типично 2-8).
  • Slot — единица выделения ресурсов. Один subtask = занимает один slot (если slot sharing не включён, что по умолчанию ДА).
  • Slot sharing: subtasks разных операторов одного job могут делить один slot. Это уменьшает количество TaskManager’ов, нужных для job.

Пример: job с операторами Source/parallelism=2, KeyedReduce/parallelism=4, Sink/parallelism=2. Без slot sharing нужно 2+4+2=8 slots. Со slot sharing — достаточно max(2,4,2)=4 slot. Огромная разница.

NOTE

Slot sharing включён по умолчанию и рекомендуется для большинства случаев. Disabling — для специфических ситуаций, когда нужно изолировать heavy operator (например, операторы с большим state, которые могут мешать друг другу).


Pipeline parallelism: где быстро, где медленно

Важно понимать: операторы внутри job выполняются ОДНОВРЕМЕННО, не последовательно. Pipeline parallelism.

Когда событие N обрабатывается в KeyedReduce, событие N+1 уже в FlatMap, а событие N+2 ещё в Source. Все они движутся через pipeline одновременно.

Это даёт streaming низкую end-to-end latency: событие “пришло -> ушло в sink” за миллисекунды, потому что не нужно ждать batch.

Но это же означает: самый медленный оператор определяет throughput всего job. Если KeyedReduce обрабатывает 1000 событий/сек, а Source может производить 10000/сек — job будет работать на 1000/сек, и накопится backpressure перед KeyedReduce.

Backpressure — главный диагностический сигнал. Если видите в Web UI HIGH backpressure на каком-то операторе — это узкое место. Решения:

  • Увеличить parallelism этого оператора (если он keyed — нужен скейлинг и upstream, чтобы переразбить ключи).
  • Оптимизировать код оператора (профайлинг, удалить аллокации, кешировать вычисления).
  • Заменить sync операции на async (модуль 09 про Async I/O).
  • Изменить сериализацию на более быструю (Avro вместо JSON, POJO вместо Kryo).

Граф пример: реальный CDC pipeline

Чтобы закрепить картину — реальный production CDC pipeline:

Production CDC pipeline: Postgres -> Flink -> Kafka + Iceberg

Postgres CDC (parallelism=1)

Postgres CDC Source: читает WAL через logical replication. Parallelism = 1 (источник логически single-threaded). Сразу bottleneck, но WAL читается быстро.
rebalance (1 -> 8)

Filter + Map (parallelism=8)

Filter + Map: фильтрация по типу operation (только INSERT/UPDATE/DELETE для нужных таблиц), нормализация. Rebalance из 1 source в 8 параллельных filter subtasks — round-robin.
keyBy(table, pk)

Dedup + Schema enforce (parallelism=8)

KeyedProcess: дедупликация и schema enforcement по ключу (table_name, primary_key). Каждый ключ в одном subtask, корректная история на ключ.
broadcast / sink-specific

Kafka Sink (parallelism=8)

Kafka Sink: пишет в outbox-топик для downstream consumers (микросервисы). Parallelism = 8 (matches Kafka partitions).

Iceberg Sink (parallelism=4)

Iceberg Sink: пишет в Apache Iceberg таблицу через streaming writer. Parallelism = 4 (меньше, чем upstream — Iceberg writes более тяжёлые).

Из этой картины сразу видно:

  • Source — single-threaded (так устроена Postgres logical replication). Это потенциальное узкое место, нужно мониторить throughput.
  • Filter и Dedup — параллельны (8 каждый). Если backpressure тут — увеличиваем parallelism, разбиваем ключ детальнее.
  • Sinks разной параллельности — Kafka соответствует партициям, Iceberg меньше (потому что pisat в parquet/orc файлы — операция тяжёлая).
  • Rebalance после single-threaded source — критично для распределения нагрузки.
  • keyBy перед Dedup — обеспечивает корректность (один ключ в одном state).

Эта картина — типична для production. Когда вы увидите такой граф в Web UI, не пугайтесь — это нормальная структура CDC pipeline.


Запуск job: что происходит

Когда вы вызываете env.execute():

Submit job: что происходит под капотом
Шаг 1: ClientВаш код выполняется в клиенте (вашем приложении или Flink CLI). Метод execute() строит JobGraph из всех вызовов API. JobGraph сериализуется.
Шаг 2: Submit to JobManagerJobGraph + JAR-файл с вашим кодом отправляются в JobManager через REST или RPC. JobManager сохраняет JAR в blob store, JobGraph в memory.
Шаг 3: ExecutionGraphJobManager строит ExecutionGraph — physical план с subtasks. Запрашивает у ResourceManager нужное количество slots на TaskManager'ах.
Шаг 4: Deploy subtasksJobManager отправляет на каждый TaskManager деплой каждого subtask. TaskManager скачивает JAR из blob store, создаёт thread для subtask.
Шаг 5: RunningSubtasks начинают обработку. Source poll'ит данные, передаёт downstream. Job в состоянии RUNNING. Checkpoints начинают регулярно срабатывать.

Этот процесс — milliseconds до seconds. Большая часть времени — выделение slots и deployment JAR (особенно для больших JAR с heavy dependencies).


Попробуй сам

Откройте Web UI с running job из урока 00.3 (WordCount):

  1. Найдите Job Graph. Сколько операторов? Какие имена?
  2. Кликните на каждый оператор — какой у него parallelism? Какие input/output partitioning стрелки видите?
  3. Найдите subtasks. Сколько их у каждого оператора? Сколько records in обработал каждый?
  4. Какие операторы chained? Если видите блок типа “Source -> Map” — это chain. Сколько таких chain в job?
  5. Поэкспериментируйте: остановите job, измените parallelism через env.setParallelism(4) или на операторе явно, перезапустите. Увидите новую структуру.

Это упражнение даёт интуицию, которую ничем не заменишь.

Проверка знанийKnowledge check
У вас Flink job с структурой: Kafka Source (parallelism=4) -> Filter -> keyBy(userId) -> Aggregate -> Kafka Sink. Source чтает 100k records/sec без backpressure. Но Aggregate показывает backpressure HIGH, а throughput на Sink — всего 30k records/sec. Что наиболее вероятная причина и какие шаги диагностики?
ОтветAnswer
Причина: Aggregate — узкое место pipeline. Backpressure HIGH означает, что Aggregate не успевает обработать поступающие записи; через back-pressure это распространяется upstream и в итоге Source тоже замедляется (но Source может показывать ОК статус, потому что Kafka буфферизирует). Throughput всего pipeline ограничен самым медленным оператором = 30k/sec в Aggregate. Шаги диагностики: (1) увеличить parallelism Aggregate (например, до 8); (2) проверить, нет ли hot keys — если один user_id даёт 80% траффика, parallelism не поможет, нужен другой ключ или pre-aggregation; (3) профайлить код Aggregate через Web UI flame graphs — возможно, медленная сериализация или внешний вызов; (4) проверить, не используется ли Kryo fallback (медленная сериализация — модуль 03 урок 5).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое operator chain в Flink и в каких условиях соседние операторы chain'ятся автоматически?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4