Parallelism и partitioning стратегии
Когда оператор имеет parallelism > 1, нужно решить: как распределять события между параллельными subtasks следующего оператора? В Flink есть несколько стратегий partitioning, каждая со своим trade-off между producer-consumer balance, ordering guarantees, и сетевой нагрузкой. Этот урок — про практическое применение всех стратегий.
К концу урока вы будете понимать, какая стратегия применяется в каждой точке вашего pipeline (Web UI отображает их явно), и сможете осознанно выбирать .rebalance() vs .rescale() vs другие.
Parallelism: фундамент
parallelism оператора — это количество его параллельных subtasks. Чем выше, тем больше throughput, но и больше overhead (state distribution, coordination).
// Default parallelism для всего job
env.setParallelism(4);
// Override на отдельный оператор
stream
.map(...).setParallelism(2) // только 2 subtasks для map
.keyBy(...)
.sum(1).setParallelism(8); // 8 subtasks для sum
Maximum parallelism: hard limit, который задаётся на job. Влияет на структуру keyed state (key groups). Изменить max parallelism после первого запуска нельзя (он закодирован в state). Поэтому ставьте его щедро:
env.setMaxParallelism(256); // позволит scale up до 256 без recompute state
По умолчанию max parallelism = 128 (для маленьких parallelism) или вычисляется автоматически.
Partitioning: как распределяются события
Между двумя соседними операторами Flink применяет одну из стратегий partitioning. Каждая определяет, как событие из subtask A.i попадает в subtask B.j.
Forward: 1-to-1 без shuffle
Forward — самый дешёвый. Это когда два соседних оператора имеют одинаковый parallelism и нет explicit partitioning между ними.
DataStream<String> mapped = source.map(...); // parallelism = 4
DataStream<String> filtered = mapped.filter(...); // parallelism = 4
// Между map и filter — FORWARD. Они даже chain'ятся в один оператор в Web UI.
В Web UI стрелка между ними подписана FORWARD. Это и есть chain — нет shuffle, события передаются как Java objects в одном thread.
Когда forward автоматически:
- Соседние операторы одинакового parallelism.
- Нет keyBy / rebalance / rescale / broadcast / partitionCustom между ними.
Когда forward автоматически НЕ применяется:
- Разный parallelism — Flink не может 1-to-1 (например, 4 -> 2).
- keyBy или другой explicit partitioning.
keyBy (hash): для stateful aggregation
Самый частый partitioning после forward. Уже разобран в уроке 03.2.
stream.keyBy(event -> event.getUserId())
.sum(...);
Что происходит: hash(userId) % parallelism. Все события одного user — в один subtask. Это и есть основа keyed state.
Cost: shuffle по сети. Если subtasks на разных TaskManager’ах — сериализация и сетевая передача.
В Web UI стрелка после keyBy — HASH.
rebalance: round-robin
Когда нужен идеальный balance нагрузки, но не нужно keyed state — используйте rebalance():
DataStream<Event> imbalanced = source; // parallelism = 1 (CDC source)
DataStream<Event> balanced = imbalanced.rebalance(); // round-robin к down-stream
DataStream<Event> processed = balanced.map(...).setParallelism(8);
Каждое событие отправляется в следующий subtask по circular: subtask 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, … Это даёт perfect balance.
Когда полезно:
- После single-threaded source (CDC, file source с одним splitter).
- Для распределения нагрузки на CPU-heavy операторе.
- После filter с большой selectivity (event drop пропорционально на subtask).
Cost: full shuffle — каждое событие может идти к любому subtask, включая на другом TaskManager.
В Web UI стрелка — REBALANCE.
rescale: local round-robin
Похож на rebalance, но только в пределах одного TaskManager (по возможности).
stream.rescale().map(...).setParallelism(4);
Если up-stream subtask на TM1, и на TM1 есть 2 down-stream subtasks, и на TM2 — другие 2, тогда:
- rescale: up-stream на TM1 отправляет только в 2 subtasks на TM1 (без сети).
- rebalance: up-stream на TM1 отправляет во все 4 subtasks (включая на TM2 — по сети).
Когда полезно:
- Когда нужен balance, но overhead сети нежелателен.
- При больших volumes, где сетевая нагрузка существенна.
Cost: дешевле rebalance, но balance менее perfect (если ratio не одинаковый, некоторые TMs получат больше).
В Web UI стрелка — RESCALE.
broadcast: на все subtasks
Каждое событие отправляется во ВСЕ subtasks down-stream:
DataStream<Rule> rules = source;
BroadcastStream<Rule> broadcastRules = rules.broadcast(rulesDescriptor);
// Используется в connect() с другим stream
DataStream<EnrichedEvent> enriched = mainStream
.connect(broadcastRules)
.process(new BroadcastEnrichFunction());
Когда нужно:
- Конфигурация / dictionary, которое нужно знать всем subtasks.
- Правила фильтрации, которые меняются on-the-fly (broadcast state pattern — модуль 08).
- Маленькие справочники (страны, currency codes).
Cost: высокий — каждое событие N раз по сети (N = parallelism). Не для high-throughput streams. Подходит только для маленьких control streams (правила, конфигурация).
В Web UI стрелка — BROADCAST.
Partitioning в Kafka: похожая логика распределенияpartitionCustom: своя логика
Для специфических случаев — свой Partitioner:
public class TimeBucketPartitioner implements Partitioner<Long> {
@Override
public int partition(Long timestamp, int numPartitions) {
long bucket = timestamp / 3600_000L; // hour bucket
return (int) (bucket % numPartitions);
}
}
stream.partitionCustom(new TimeBucketPartitioner(), event -> event.getTimestamp());
Это HASH с custom hash function. Используется редко — обычно достаточно keyBy или rebalance.
В Web UI стрелка — CUSTOM.
global: всё в один subtask
.global() — отправляет всё в subtask 0 down-stream. Down-stream получает parallelism = 1 эффективно.
stream.map(...).setParallelism(8)
.global()
.map(...); // эта map будет иметь parallelism = 1
Когда использовать:
- Финальная агрегация всех metrics в одно место.
- Sort всего потока (только если он bounded).
- Очень редко — обычно есть лучшие альтернативы.
Опасность: bottleneck — single thread обрабатывает всё. Не для high throughput.
Композиция: реальный pipeline
Postgres CDC (parallelism=1)
Postgres CDC source: parallelism = 1 (логически single-threaded — читает WAL). Это immediately bottleneck.Map + Filter (parallelism=8)
Map + Filter: разбирает CDC record, фильтрует non-relevant. parallelism = 8. Rebalance от source — каждое сообщение в следующий subtask по circular для perfect balance.Enrich (parallelism=8, FORWARD)
Enrich (lookup country): добавляет country по IP через RichMap. parallelism = 8 — соответствует upstream, поэтому FORWARD. Дешевле.KeyedAggregate (parallelism=8, HASH by userId)
KeyedAggregate: keyBy(userId) для stateful aggregation. parallelism = 8. Hash partitioning — все события одного user в один subtask.Kafka Sink (parallelism=8, FORWARD)
Kafka Sink: parallelism = 8 (matches partitions). Forward от KeyedAggregate.Metrics Sink (parallelism=1, GLOBAL)
Metrics broadcast: отправляет агрегаты во все subtasks Metrics Aggregator (parallelism=1) для финального reporting. BROADCAST по small stream.Реальная картина: в одном pipeline используются разные partitioning стратегии в зависимости от потребности на каждом шаге.
Production tips
-
Default parallelism set explicitly: не полагайтесь на default.
env.setParallelism(N)в начале job — best practice. -
Override parallelism на heavy операторах: если знаете, что Map дешевый, а Aggregate тяжёлый —
.setParallelism()на каждом операторе индивидуально. -
Max parallelism set generously:
env.setMaxParallelism(256)— позволяет future-scale без recompute state. Не используйте default. -
Rebalance после single-threaded source: КРИТИЧНО. CDC source даёт parallelism=1; без rebalance вся pipeline становится single-threaded.
-
Не используйте rebalance перед keyBy: keyBy сам делает shuffle. Rebalance впустую перераспределит, и потом keyBy опять перераспределит.
-
Forward chain — оптимизация: если операторы одного parallelism и нет partitioning между ними — они chain’ятся, что бесплатно. Не нарушайте chain без причины.
-
partition.discovery.interval.ms: для KafkaSource — если в продакшене добавляете партиции, source их подхватит автоматически.
Distinguishing: rebalance vs rescale vs keyBy
Эти три partitioning часто путают. Сравним:
| Стратегия | Distribution | Sky cost | When |
|---|---|---|---|
| keyBy(k) | hash(k) % N | Full shuffle | Нужен stateful aggregation per key |
| rebalance | round-robin | Full shuffle | Идеальный balance, no key needed |
| rescale | local round-robin (по possible) | Local shuffle | Balance, но network expensive |
| forward | 1-to-1 (same parallelism) | Free (chain) | Same parallelism, no partitioning |
Главное правило: используйте forward (default) max часто; rebalance только когда balance важен; keyBy только когда нужна группировка по ключу.
Попробуй сам
Расширьте свой fraud detection pipeline:
-
Поставьте explicit parallelism на каждом операторе: Source = 2, Map = 4, KeyedAggregate = 8, Sink = 4. Запустите. В Web UI — какие partitioning стрелки видите между операторами?
-
Добавьте
.rebalance()после map: что изменилось в Web UI? Где остался FORWARD, где появился REBALANCE? Performance отличается? -
Замените
.rebalance()на.rescale(): на одной машине вы не увидите разницы, но если бы у вас был кластер из 5 машин — какая стратегия была бы дешевле? -
Bonus: запустите кастомного partitioner на основе hash(country):
.partitionCustom(new CountryHashPartitioner(), e -> e.getCountry()). Что произойдёт, если у вас США = 80% траффика? (Это hot key, видите skew в Web UI.)