Learning Platform
Глоссарий Troubleshooting
Урок 03.03 · 26 мин
Продвинутый
StreamGraphJobGraphExecutionGraphOperator chainingGraph transformations

Когда вы пишете DataStream-программу, ваш код описывает логические operations: source, map, keyBy, window, sink. Этот код не исполняется напрямую — он сначала компилируется в три разных уровня графа, каждый со своей семантикой и оптимизациями. Понимание этой трансформации необходимо для дебаггинга performance: когда вы смотрите Job graph в Web UI и видите “одну box” из map + filter + sink, это не баг визуализации — это operator chaining, реальный merge нескольких ваших операторов в один runtime Task.

В этом уроке разберём три уровня графа: StreamGraph (что генерируется из user-кода), JobGraph (после оптимизаций, отправляется в JM), ExecutionGraph (runtime-граф с parallelism expansion).

DAG Scheduler в Spark: от действия к стадиям Анатомия Flink-job: DAG, операторы, parallelism

Три уровня: общая картина

Pipeline трансформаций
User codeDataStream-программа: env.fromSource(...).map(...).keyBy(...).window(...).process(...).sinkTo(...). Логическое описание pipeline.
StreamGraphGenerator
StreamGraphЛогический граф. Каждая операция = StreamNode. Полная репрезентация без оптимизаций. Содержит StreamEdges между nodes.
StreamingJobGraphGenerator
JobGraphОптимизированный граф для отправки в кластер. Operator chaining применено. Каждый JobVertex = один или несколько chained operators. Готов к serialize и REST submission.
ExecutionGraphBuilder
ExecutionGraphRuntime-граф в JobMaster. Каждая ExecutionJobVertex = JobVertex с parallelism expansion: N ExecutionVertex (по одной на parallel instance). Track state RUNNING/FAILED/FINISHED.

Каждая трансформация — это отдельный класс генератора, который преобразует один граф в другой, применяя specific optimizations.

StreamGraph: логический граф

StreamGraph — это first representation вашей DataStream-программы. Генерируется на client side (там, где вы вызвали env.execute()).

Главный класс: org.apache.flink.streaming.api.graph.StreamGraph. Генератор: org.apache.flink.streaming.api.graph.StreamGraphGenerator.

Структура StreamGraph:

  • StreamNode: один логический оператор (Source, Map, KeyBy, и т.д.). Содержит:
    • operatorID
    • operatorName (“Map”, “Source: KafkaSource”, “Window: tumbling(5min)”)
    • parallelism
    • StreamOperatorFactory (как создать runtime instance)
    • StateBackend (если specified)
    • SlotSharingGroup
  • StreamEdge: связь между двумя StreamNode. Содержит:
    • source/target StreamNode IDs
    • partitioner (RebalancePartitioner, KeyGroupStreamPartitioner, ForwardPartitioner, …)
    • typeSerializer
    • shuffleMode (PIPELINED, BLOCKING, или UNDEFINED)

Пример

DataStream:

DataStream<Event> source = env.fromSource(kafkaSource, ..., "kafka-src");
DataStream<EnrichedEvent> mapped = source.map(new EnrichFunction()).name("enrich");
DataStream<EnrichedEvent> filtered = mapped.filter(e -> e.isValid()).name("filter");
DataStream<UserStats> aggregated = filtered
    .keyBy(e -> e.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new StatsAggregator()).name("aggregate");
aggregated.sinkTo(jdbcSink).name("jdbc-sink");

StreamGraph будет содержать:

StreamGraph для примера
StreamNode: kafka-srcSource operator. parallelism=4, ForwardPartitioner downstream.
forward
StreamNode: enrichMap operator. parallelism=4, ForwardPartitioner downstream.
StreamNode: filterFilter operator. parallelism=4, KeyGroupStreamPartitioner downstream (потому что после filter — keyBy).
forward
StreamNode: aggregateWindow aggregate operator. parallelism=4, ForwardPartitioner downstream.
StreamNode: jdbc-sinkSink operator. parallelism=4.

Здесь edge между filter и aggregate имеет KeyGroupStreamPartitioner — это значит, после filter данные re-partition по hash(key), чтобы window получил все события одного key.

Все edges между другими nodes — ForwardPartitioner: данные идут напрямую в downstream instance с тем же index.

Что StreamGraph не имеет

  • Operator chaining не применено. Каждый node — отдельный.
  • JobGraph optimizations ещё не применены.
  • Parallelism instances не expanded. Только parallelism как property.

Visualization StreamGraph можно получить через env.getStreamGraph().getStreamingPlanAsJSON() — выдаёт JSON, который можно загрузить в Flink Plan Visualizer.

JobGraph: после chaining

JobGraph — это optimized representation, готовая к отправке в JobMaster. Главная optimization: operator chaining.

Главный класс: org.apache.flink.runtime.jobgraph.JobGraph. Генератор: org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.

Структура JobGraph:

  • JobVertex: один или несколько chained StreamNode. Это unit of deployment. Когда JobMaster деплоит Task — он деплоит один JobVertex (с parallelism expansion).
  • IntermediateDataSet: output JobVertex, который consumed by another JobVertex (downstream).
  • JobEdge: связь между двумя JobVertex через IntermediateDataSet. Описывает distribution pattern (POINTWISE = ForwardPartitioner, ALL_TO_ALL = другие partitioners).

Operator chaining: правила

Два соседних StreamNode chained в один JobVertex, если все условия:

  1. Same slot sharing group. По умолчанию все — в “default”, так что это выполняется.
  2. Forwarding partitioner. ForwardPartitioner = chaining ok. Любой shuffling partitioner (KeyGroup, Rebalance, Shuffle, Broadcast) — break chain.
  3. Same parallelism. Если parallelism разный — необходим shuffle — break chain.
  4. Source operator (для downstream): только если downstream chaining strategy = HEAD (источник цепочки) или ALWAYS.
  5. Chaining strategy = ALWAYS для downstream operator. Можно отключить через .disableChaining().
  6. Operator chaining включён globally (по умолчанию yes).

Для нашего примера:

  • kafka-src -> enrich: same parallelism (4), forward partitioner, same SSG -> chained.
  • enrich -> filter: same parallelism, forward -> chained.
  • filter -> aggregate: KeyGroup partitioner (после keyBy) -> NOT chained.
  • aggregate -> jdbc-sink: same parallelism, forward -> chained.

JobGraph:

JobGraph для примера: chaining применён
JobVertex 1: Source -> Enrich -> FilterТри chained operators: kafka-src + enrich + filter. Все в одном Task. Internal data flow через method calls, no serialization. parallelism=4.
ALL_TO_ALL (KeyGroup shuffle)
JobVertex 2: Aggregate -> SinkДва chained operators: aggregate + jdbc-sink. parallelism=4. Network input от Filter (через IntermediateDataSet).

Результат: вместо 5 StreamNode -> 2 JobVertex. Это значит 2 Tasks per parallel instance, не 5. Если parallelism = 4, total Tasks = 8 (не 20).

Зачем chaining?

  • Performance: внутри chain data передаётся через method calls (output.collect(record)) вместо serialization + network buffer. Это 5-10x быстрее.
  • Memory: меньше buffers, меньше serializers.
  • Latency: меньше hops, меньше queuing.

Без chaining Flink был бы значительно медленнее. Это default optimization, которая работает прозрачно.

Когда chaining отключать

В редких случаях:

mapped.disableChaining();  // explicit break

Полезно, когда:

  • Хотите отдельные Web UI metrics per operator (chained operators show as one Task в metrics).
  • Хотите изолировать performance impact (если один operator slow, не хочется блокировать другие в том же thread).

Но в 99% случаев — оставляйте chaining включенным.

ExecutionGraph: runtime граф

ExecutionGraph — это runtime representation в JobMaster. Это где parallelism expansion случается: каждый JobVertex с parallelism=N -> N ExecutionVertex (по одной на parallel instance).

Главный класс: org.apache.flink.runtime.executiongraph.ExecutionGraph. Сопутствующие:

  • ExecutionJobVertex: соответствует одному JobVertex.
  • ExecutionVertex: одна параллельная инстанция.
  • Execution: один attempt deployment ExecutionVertex (если задача failed и retry — новый Execution).

Структура для нашего примера (parallelism = 4):

ExecutionGraph для примера
ExecJobVertex 1Соответствует JobVertex 1 (Source -> Enrich -> Filter). Parallelism = 4, поэтому 4 ExecutionVertex.
ExecutionVertex 1.0Parallel instance 0 of JobVertex 1. Deploy в slot 0, runs Source/Enrich/Filter для subset partitions.
ExecutionVertex 1.1Parallel instance 1.
ExecutionVertex 1.2Parallel instance 2.
ExecutionVertex 1.3Parallel instance 3.
N x N shuffle (KeyGroup)
ExecJobVertex 2Соответствует JobVertex 2 (Aggregate -> Sink). Parallelism = 4.
ExecutionVertex 2.0Parallel instance 0 of JobVertex 2. Получает данные от всех vertices 1.0-1.3 (для своего range key groups).
ExecutionVertex 2.1Parallel instance 1.
ExecutionVertex 2.2Parallel instance 2.
ExecutionVertex 2.3Parallel instance 3.

Каждая ExecutionVertex — это unit of scheduling. Scheduler решает, в какой slot deploy конкретную vertex, и инициирует submitTask RPC.

Execution state machine

Каждая ExecutionVertex имеет state machine (через Execution object):

ExecutionVertex state machine
CREATEDInitial state. Vertex создана из JobVertex, но ещё не назначена slot.
SCHEDULEDSlot request отправлен к ResourceManager.
DEPLOYINGSlot выделен, submitTask RPC отправлен к TaskExecutor. Vertex deployed.
RUNNINGTask получен, started, invoke() работает. Главное operational state.
FINISHEDTask completed successfully (для bounded sources). Terminal state.
CANCELINGCancel request отправлен к Task. Task должен остановиться gracefully.
CANCELEDCancel completed. Terminal state.
FAILEDException в Task. Restart strategy решит, что делать дальше.
RECONCILINGПосле JM failover — JM пытается восстановить state Task-ов через query к TM-ам.
INITIALIZINGTask запущен, но ещё инициализируется (restoring state). Промежуточное state перед RUNNING.

Когда вы открываете Web UI и видите статус Task — это его state в state machine. Если есть FAILED Task-и — это alert.

Connection patterns: POINTWISE vs ALL_TO_ALL

JobEdge между двумя JobVertex имеет distribution pattern:

  • POINTWISE: каждая parallel instance source-а connect-ится к одной или нескольким parallel instances target-а. ForwardPartitioner — это POINTWISE 1-to-1 (same parallelism).
  • ALL_TO_ALL: каждая parallel instance source-а connect-ится к всем parallel instances target-а. KeyGroup, Rebalance, Broadcast — это ALL_TO_ALL.

В ExecutionGraph эти patterns expanded:

POINTWISE 1-to-1, parallelism 4:

Source 0 -> Target 0
Source 1 -> Target 1
Source 2 -> Target 2
Source 3 -> Target 3

POINTWISE 1-to-N (parallelism rescale):

Source 0 -> Target 0, Target 1
Source 1 -> Target 2, Target 3

ALL_TO_ALL:

Source 0 -> Target 0, 1, 2, 3
Source 1 -> Target 0, 1, 2, 3
...

При создании ExecutionGraph для ALL_TO_ALL создаются N×N IntermediateResultPartition + InputChannel, что может быть много connections при больших parallelism.

Generation: client side vs JM side

Где какой граф генерируется:

ГрафГде генерируетсяКогда
StreamGraphClient (где env.execute())При сборке DataStream
JobGraphClient (transform StreamGraph -> JobGraph)Перед submission
ExecutionGraphJobMasterПосле JobGraph submission и start JobMaster

JobGraph serialized и отправляется через REST API к Dispatcher. На JobMaster происходит deserialization и build ExecutionGraph.

Это значит:

  • Если вы хотите inspect graph before submission, делайте это на client: env.getStreamGraph() или env.getStreamGraph().getJobGraph().
  • Если хотите inspect runtime граф, делайте это через JM REST API: GET /jobs/{job-id}/plan.

Where in Web UI

Web UI Flink-а показывает JobGraph (logical) при общем view job-а. Когда кликнете на vertex, увидите ExecutionVertex subtasks (parallelism expansion).

Если вы видите в Web UI 5 boxes для job, который вы написали из 7 операторов — это значит operator chaining merged 7 операторов в 5 JobVertex.

Code references

  • org.apache.flink.streaming.api.graph.StreamGraph: главный класс. Метод getStreamingPlanAsJSON() — для visualizer.
  • org.apache.flink.streaming.api.graph.StreamGraphGenerator: build StreamGraph из StreamExecutionEnvironment.transformations.
  • org.apache.flink.runtime.jobgraph.JobGraph: главный класс JobGraph.
  • org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator: build JobGraph из StreamGraph. Метод createJobGraph — main entry. Внутри:
    • setChaining — apply chaining
    • setSlotSharingAndCoLocation
    • setManagedMemoryFraction
  • org.apache.flink.runtime.executiongraph.ExecutionGraph: главный класс ExecutionGraph.
  • org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder (исторически) или DefaultExecutionGraphFactory (новее) — build ExecutionGraph из JobGraph.

Чтобы trace, как ваш код становится runtime, начните с StreamGraphGenerator.generate() и идите вниз по стеку.

Debugging: inspect graph

Полезные REST endpoints:

# JobGraph (logical):
curl http://jm:8081/jobs/{job-id}/plan | jq .

# ExecutionGraph + state:
curl http://jm:8081/jobs/{job-id} | jq .

# Конкретный vertex:
curl http://jm:8081/jobs/{job-id}/vertices/{vertex-id} | jq .

# Subtasks конкретного vertex:
curl http://jm:8081/jobs/{job-id}/vertices/{vertex-id}/subtasktimes | jq .

/plan endpoint возвращает JSON в формате Flink Plan Visualizer — можно скопировать его и paste в https://flink.apache.org/visualizer для красивой визуализации.

Что дальше

Следующий урок — про HA и leader election. Как JobManager переживает failure: ZooKeeper-based или K8s-based leader election, JobGraphStore, CompletedCheckpointStore, recovery flow.

После — последний урок модуля 02: scheduler types (Default vs Adaptive vs Adaptive Batch).

Проверка знанийKnowledge check
Вы видите в Web UI Flink job: 4 boxes (JobVertex). Ваш код имеет 12 операторов: source -> map1 -> map2 -> keyBy -> window -> process -> map3 -> filter -> map4 -> keyBy -> reduce -> sink. Все parallelism = 8. Почему 4 boxes? Объясните, какие операторы chained вместе, и какие JobEdges имеют POINTWISE vs ALL_TO_ALL distribution.
ОтветAnswer
Operator chaining + 2 keyBy = 4 JobVertex. Группировка: JobVertex 1: source -> map1 -> map2 (форвард partitioner, same parallelism, all chained). JobVertex 2: window -> process -> map3 -> filter -> map4 (после первого keyBy данные re-partitioned, дальше эти 5 операторов chained, потому что forward partitioning между ними). JobVertex 3: reduce (после второго keyBy — это start новой chain). JobVertex 4: sink. Connections: source-map1-map2 (JV1) -> window-process-...-map4 (JV2) — ALL_TO_ALL distribution (keyBy = KeyGroupPartitioner = re-shuffle). map4 (JV2) -> reduce (JV3) — снова ALL_TO_ALL (keyBy). reduce (JV3) -> sink (JV4) — POINTWISE 1-to-1 (forward partitioner после reduce, same parallelism 8). Каждая ExecutionVertex одного JV соответствует одной parallel instance. Total ExecutionVertex: 4 JV × 8 parallelism = 32 vertices. Total Tasks: 32. Number of slots needed (с default slot sharing): 8 (по одной vertex каждого JV в один slot, 8 параллельных pipeline). Это elegant и compact — без chaining было бы 12 × 8 = 96 Tasks вместо 32, и performance был бы значительно хуже из-за serialization overhead на каждом chain boundary, которые сейчас merged.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Три уровня графа в Flink — что происходит на каждом, и где они генерируются?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5