Когда вы пишете DataStream-программу, ваш код описывает логические operations: source, map, keyBy, window, sink. Этот код не исполняется напрямую — он сначала компилируется в три разных уровня графа, каждый со своей семантикой и оптимизациями. Понимание этой трансформации необходимо для дебаггинга performance: когда вы смотрите Job graph в Web UI и видите “одну box” из map + filter + sink, это не баг визуализации — это operator chaining, реальный merge нескольких ваших операторов в один runtime Task.
В этом уроке разберём три уровня графа: StreamGraph (что генерируется из user-кода), JobGraph (после оптимизаций, отправляется в JM), ExecutionGraph (runtime-граф с parallelism expansion).
DAG Scheduler в Spark: от действия к стадиям Анатомия Flink-job: DAG, операторы, parallelismТри уровня: общая картина
Каждая трансформация — это отдельный класс генератора, который преобразует один граф в другой, применяя specific optimizations.
StreamGraph: логический граф
StreamGraph — это first representation вашей DataStream-программы. Генерируется на client side (там, где вы вызвали env.execute()).
Главный класс: org.apache.flink.streaming.api.graph.StreamGraph. Генератор: org.apache.flink.streaming.api.graph.StreamGraphGenerator.
Структура StreamGraph:
- StreamNode: один логический оператор (Source, Map, KeyBy, и т.д.). Содержит:
- operatorID
- operatorName (“Map”, “Source: KafkaSource”, “Window: tumbling(5min)”)
- parallelism
- StreamOperatorFactory (как создать runtime instance)
- StateBackend (если specified)
- SlotSharingGroup
- StreamEdge: связь между двумя StreamNode. Содержит:
- source/target StreamNode IDs
- partitioner (RebalancePartitioner, KeyGroupStreamPartitioner, ForwardPartitioner, …)
- typeSerializer
- shuffleMode (PIPELINED, BLOCKING, или UNDEFINED)
Пример
DataStream:
DataStream<Event> source = env.fromSource(kafkaSource, ..., "kafka-src");
DataStream<EnrichedEvent> mapped = source.map(new EnrichFunction()).name("enrich");
DataStream<EnrichedEvent> filtered = mapped.filter(e -> e.isValid()).name("filter");
DataStream<UserStats> aggregated = filtered
.keyBy(e -> e.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new StatsAggregator()).name("aggregate");
aggregated.sinkTo(jdbcSink).name("jdbc-sink");
StreamGraph будет содержать:
Здесь edge между filter и aggregate имеет KeyGroupStreamPartitioner — это значит, после filter данные re-partition по hash(key), чтобы window получил все события одного key.
Все edges между другими nodes — ForwardPartitioner: данные идут напрямую в downstream instance с тем же index.
Что StreamGraph не имеет
- Operator chaining не применено. Каждый node — отдельный.
- JobGraph optimizations ещё не применены.
- Parallelism instances не expanded. Только parallelism как property.
Visualization StreamGraph можно получить через env.getStreamGraph().getStreamingPlanAsJSON() — выдаёт JSON, который можно загрузить в Flink Plan Visualizer.
JobGraph: после chaining
JobGraph — это optimized representation, готовая к отправке в JobMaster. Главная optimization: operator chaining.
Главный класс: org.apache.flink.runtime.jobgraph.JobGraph. Генератор: org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.
Структура JobGraph:
- JobVertex: один или несколько chained StreamNode. Это unit of deployment. Когда JobMaster деплоит Task — он деплоит один JobVertex (с parallelism expansion).
- IntermediateDataSet: output JobVertex, который consumed by another JobVertex (downstream).
- JobEdge: связь между двумя JobVertex через IntermediateDataSet. Описывает distribution pattern (POINTWISE = ForwardPartitioner, ALL_TO_ALL = другие partitioners).
Operator chaining: правила
Два соседних StreamNode chained в один JobVertex, если все условия:
- Same slot sharing group. По умолчанию все — в “default”, так что это выполняется.
- Forwarding partitioner. ForwardPartitioner = chaining ok. Любой shuffling partitioner (KeyGroup, Rebalance, Shuffle, Broadcast) — break chain.
- Same parallelism. Если parallelism разный — необходим shuffle — break chain.
- Source operator (для downstream): только если downstream chaining strategy = HEAD (источник цепочки) или ALWAYS.
- Chaining strategy = ALWAYS для downstream operator. Можно отключить через
.disableChaining(). - Operator chaining включён globally (по умолчанию yes).
Для нашего примера:
- kafka-src -> enrich: same parallelism (4), forward partitioner, same SSG -> chained.
- enrich -> filter: same parallelism, forward -> chained.
- filter -> aggregate: KeyGroup partitioner (после keyBy) -> NOT chained.
- aggregate -> jdbc-sink: same parallelism, forward -> chained.
JobGraph:
Результат: вместо 5 StreamNode -> 2 JobVertex. Это значит 2 Tasks per parallel instance, не 5. Если parallelism = 4, total Tasks = 8 (не 20).
Зачем chaining?
- Performance: внутри chain data передаётся через method calls (
output.collect(record)) вместо serialization + network buffer. Это 5-10x быстрее. - Memory: меньше buffers, меньше serializers.
- Latency: меньше hops, меньше queuing.
Без chaining Flink был бы значительно медленнее. Это default optimization, которая работает прозрачно.
Когда chaining отключать
В редких случаях:
mapped.disableChaining(); // explicit break
Полезно, когда:
- Хотите отдельные Web UI metrics per operator (chained operators show as one Task в metrics).
- Хотите изолировать performance impact (если один operator slow, не хочется блокировать другие в том же thread).
Но в 99% случаев — оставляйте chaining включенным.
ExecutionGraph: runtime граф
ExecutionGraph — это runtime representation в JobMaster. Это где parallelism expansion случается: каждый JobVertex с parallelism=N -> N ExecutionVertex (по одной на parallel instance).
Главный класс: org.apache.flink.runtime.executiongraph.ExecutionGraph. Сопутствующие:
- ExecutionJobVertex: соответствует одному JobVertex.
- ExecutionVertex: одна параллельная инстанция.
- Execution: один attempt deployment ExecutionVertex (если задача failed и retry — новый Execution).
Структура для нашего примера (parallelism = 4):
Каждая ExecutionVertex — это unit of scheduling. Scheduler решает, в какой slot deploy конкретную vertex, и инициирует submitTask RPC.
Execution state machine
Каждая ExecutionVertex имеет state machine (через Execution object):
Когда вы открываете Web UI и видите статус Task — это его state в state machine. Если есть FAILED Task-и — это alert.
Connection patterns: POINTWISE vs ALL_TO_ALL
JobEdge между двумя JobVertex имеет distribution pattern:
- POINTWISE: каждая parallel instance source-а connect-ится к одной или нескольким parallel instances target-а. ForwardPartitioner — это POINTWISE 1-to-1 (same parallelism).
- ALL_TO_ALL: каждая parallel instance source-а connect-ится к всем parallel instances target-а. KeyGroup, Rebalance, Broadcast — это ALL_TO_ALL.
В ExecutionGraph эти patterns expanded:
POINTWISE 1-to-1, parallelism 4:
Source 0 -> Target 0
Source 1 -> Target 1
Source 2 -> Target 2
Source 3 -> Target 3
POINTWISE 1-to-N (parallelism rescale):
Source 0 -> Target 0, Target 1
Source 1 -> Target 2, Target 3
ALL_TO_ALL:
Source 0 -> Target 0, 1, 2, 3
Source 1 -> Target 0, 1, 2, 3
...
При создании ExecutionGraph для ALL_TO_ALL создаются N×N IntermediateResultPartition + InputChannel, что может быть много connections при больших parallelism.
Generation: client side vs JM side
Где какой граф генерируется:
| Граф | Где генерируется | Когда |
|---|---|---|
| StreamGraph | Client (где env.execute()) | При сборке DataStream |
| JobGraph | Client (transform StreamGraph -> JobGraph) | Перед submission |
| ExecutionGraph | JobMaster | После JobGraph submission и start JobMaster |
JobGraph serialized и отправляется через REST API к Dispatcher. На JobMaster происходит deserialization и build ExecutionGraph.
Это значит:
- Если вы хотите inspect graph before submission, делайте это на client:
env.getStreamGraph()илиenv.getStreamGraph().getJobGraph(). - Если хотите inspect runtime граф, делайте это через JM REST API:
GET /jobs/{job-id}/plan.
Where in Web UI
Web UI Flink-а показывает JobGraph (logical) при общем view job-а. Когда кликнете на vertex, увидите ExecutionVertex subtasks (parallelism expansion).
Если вы видите в Web UI 5 boxes для job, который вы написали из 7 операторов — это значит operator chaining merged 7 операторов в 5 JobVertex.
Code references
org.apache.flink.streaming.api.graph.StreamGraph: главный класс. МетодgetStreamingPlanAsJSON()— для visualizer.org.apache.flink.streaming.api.graph.StreamGraphGenerator: build StreamGraph из StreamExecutionEnvironment.transformations.org.apache.flink.runtime.jobgraph.JobGraph: главный класс JobGraph.org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator: build JobGraph из StreamGraph. МетодcreateJobGraph— main entry. Внутри:setChaining— apply chainingsetSlotSharingAndCoLocationsetManagedMemoryFraction
org.apache.flink.runtime.executiongraph.ExecutionGraph: главный класс ExecutionGraph.org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder(исторически) илиDefaultExecutionGraphFactory(новее) — build ExecutionGraph из JobGraph.
Чтобы trace, как ваш код становится runtime, начните с StreamGraphGenerator.generate() и идите вниз по стеку.
Debugging: inspect graph
Полезные REST endpoints:
# JobGraph (logical):
curl http://jm:8081/jobs/{job-id}/plan | jq .
# ExecutionGraph + state:
curl http://jm:8081/jobs/{job-id} | jq .
# Конкретный vertex:
curl http://jm:8081/jobs/{job-id}/vertices/{vertex-id} | jq .
# Subtasks конкретного vertex:
curl http://jm:8081/jobs/{job-id}/vertices/{vertex-id}/subtasktimes | jq .
/plan endpoint возвращает JSON в формате Flink Plan Visualizer — можно скопировать его и paste в https://flink.apache.org/visualizer для красивой визуализации.
Что дальше
Следующий урок — про HA и leader election. Как JobManager переживает failure: ZooKeeper-based или K8s-based leader election, JobGraphStore, CompletedCheckpointStore, recovery flow.
После — последний урок модуля 02: scheduler types (Default vs Adaptive vs Adaptive Batch).