Learning Platform
Глоссарий Troubleshooting
Урок 04.04 · 20 мин
Средний
ParallelismPartitioningShuffleRebalanceKeyBy

Parallelism и partitioning стратегии

Когда оператор имеет parallelism > 1, нужно решить: как распределять события между параллельными subtasks следующего оператора? В Flink есть несколько стратегий partitioning, каждая со своим trade-off между producer-consumer balance, ordering guarantees, и сетевой нагрузкой. Этот урок — про практическое применение всех стратегий.

К концу урока вы будете понимать, какая стратегия применяется в каждой точке вашего pipeline (Web UI отображает их явно), и сможете осознанно выбирать .rebalance() vs .rescale() vs другие.


Parallelism: фундамент

parallelism оператора — это количество его параллельных subtasks. Чем выше, тем больше throughput, но и больше overhead (state distribution, coordination).

// Default parallelism для всего job
env.setParallelism(4);

// Override на отдельный оператор
stream
    .map(...).setParallelism(2)   // только 2 subtasks для map
    .keyBy(...)
    .sum(1).setParallelism(8);    // 8 subtasks для sum

Maximum parallelism: hard limit, который задаётся на job. Влияет на структуру keyed state (key groups). Изменить max parallelism после первого запуска нельзя (он закодирован в state). Поэтому ставьте его щедро:

env.setMaxParallelism(256);  // позволит scale up до 256 без recompute state

По умолчанию max parallelism = 128 (для маленьких parallelism) или вычисляется автоматически.


Partitioning: как распределяются события

Между двумя соседними операторами Flink применяет одну из стратегий partitioning. Каждая определяет, как событие из subtask A.i попадает в subtask B.j.

Partitioning стратегии: visual обзор
FORWARD (1-to-1)Каждый subtask отправляет только в соответствующий subtask down-stream. A.0 -> B.0, A.1 -> B.1. Применяется при одинаковом parallelism и нет shuffle. Бесплатно — в одном thread, нет сериализации.
HASH (keyBy)Hash(key) % parallelism. Все события с одинаковым ключом в один subtask. Применяется после keyBy. Shuffle по сети.
REBALANCE (round-robin)Round-robin между всеми down-stream subtasks. Каждое следующее событие в следующий subtask по circular. Идеальный balance, но full shuffle.
RESCALE (local round-robin)Round-robin, но только между subtasks down-stream operator на ТОМ ЖЕ TaskManager (по возможности). Меньше сетевой нагрузки, чем rebalance.
BROADCASTКаждое событие отправляется во ВСЕ subtasks down-stream. Дорого по сети, но необходимо для broadcast state pattern (configs, dictionaries).
SHUFFLE (random)Случайное распределение между subtasks. Хороший balance в среднем, но непредсказуем. Редко используется явно — rebalance обычно лучше.
CUSTOMСвой Partitioner с произвольной логикой. Используется для специфических случаев — например, partitioning по time bucket или по custom key.
GLOBALВсе события в один subtask down-stream (parallelism = 1 для down-stream). Используется для финального aggregation, sort. Bottleneck — не для high throughput.

Forward: 1-to-1 без shuffle

Forward — самый дешёвый. Это когда два соседних оператора имеют одинаковый parallelism и нет explicit partitioning между ними.

DataStream<String> mapped = source.map(...);   // parallelism = 4
DataStream<String> filtered = mapped.filter(...);  // parallelism = 4

// Между map и filter — FORWARD. Они даже chain'ятся в один оператор в Web UI.

В Web UI стрелка между ними подписана FORWARD. Это и есть chain — нет shuffle, события передаются как Java objects в одном thread.

Когда forward автоматически:

  • Соседние операторы одинакового parallelism.
  • Нет keyBy / rebalance / rescale / broadcast / partitionCustom между ними.

Когда forward автоматически НЕ применяется:

  • Разный parallelism — Flink не может 1-to-1 (например, 4 -> 2).
  • keyBy или другой explicit partitioning.

keyBy (hash): для stateful aggregation

Самый частый partitioning после forward. Уже разобран в уроке 03.2.

stream.keyBy(event -> event.getUserId())
      .sum(...);

Что происходит: hash(userId) % parallelism. Все события одного user — в один subtask. Это и есть основа keyed state.

Cost: shuffle по сети. Если subtasks на разных TaskManager’ах — сериализация и сетевая передача.

В Web UI стрелка после keyBy — HASH.


rebalance: round-robin

Когда нужен идеальный balance нагрузки, но не нужно keyed state — используйте rebalance():

DataStream<Event> imbalanced = source;  // parallelism = 1 (CDC source)
DataStream<Event> balanced = imbalanced.rebalance();  // round-robin к down-stream
DataStream<Event> processed = balanced.map(...).setParallelism(8);

Каждое событие отправляется в следующий subtask по circular: subtask 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, … Это даёт perfect balance.

Когда полезно:

  • После single-threaded source (CDC, file source с одним splitter).
  • Для распределения нагрузки на CPU-heavy операторе.
  • После filter с большой selectivity (event drop пропорционально на subtask).

Cost: full shuffle — каждое событие может идти к любому subtask, включая на другом TaskManager.

В Web UI стрелка — REBALANCE.


rescale: local round-robin

Похож на rebalance, но только в пределах одного TaskManager (по возможности).

stream.rescale().map(...).setParallelism(4);

Если up-stream subtask на TM1, и на TM1 есть 2 down-stream subtasks, и на TM2 — другие 2, тогда:

  • rescale: up-stream на TM1 отправляет только в 2 subtasks на TM1 (без сети).
  • rebalance: up-stream на TM1 отправляет во все 4 subtasks (включая на TM2 — по сети).

Когда полезно:

  • Когда нужен balance, но overhead сети нежелателен.
  • При больших volumes, где сетевая нагрузка существенна.

Cost: дешевле rebalance, но balance менее perfect (если ratio не одинаковый, некоторые TMs получат больше).

В Web UI стрелка — RESCALE.


broadcast: на все subtasks

Каждое событие отправляется во ВСЕ subtasks down-stream:

DataStream<Rule> rules = source;
BroadcastStream<Rule> broadcastRules = rules.broadcast(rulesDescriptor);

// Используется в connect() с другим stream
DataStream<EnrichedEvent> enriched = mainStream
    .connect(broadcastRules)
    .process(new BroadcastEnrichFunction());

Когда нужно:

  • Конфигурация / dictionary, которое нужно знать всем subtasks.
  • Правила фильтрации, которые меняются on-the-fly (broadcast state pattern — модуль 08).
  • Маленькие справочники (страны, currency codes).

Cost: высокий — каждое событие N раз по сети (N = parallelism). Не для high-throughput streams. Подходит только для маленьких control streams (правила, конфигурация).

В Web UI стрелка — BROADCAST.

Partitioning в Kafka: похожая логика распределения

partitionCustom: своя логика

Для специфических случаев — свой Partitioner:

public class TimeBucketPartitioner implements Partitioner<Long> {
    @Override
    public int partition(Long timestamp, int numPartitions) {
        long bucket = timestamp / 3600_000L;  // hour bucket
        return (int) (bucket % numPartitions);
    }
}

stream.partitionCustom(new TimeBucketPartitioner(), event -> event.getTimestamp());

Это HASH с custom hash function. Используется редко — обычно достаточно keyBy или rebalance.

В Web UI стрелка — CUSTOM.


global: всё в один subtask

.global() — отправляет всё в subtask 0 down-stream. Down-stream получает parallelism = 1 эффективно.

stream.map(...).setParallelism(8)
      .global()
      .map(...);  // эта map будет иметь parallelism = 1

Когда использовать:

  • Финальная агрегация всех metrics в одно место.
  • Sort всего потока (только если он bounded).
  • Очень редко — обычно есть лучшие альтернативы.

Опасность: bottleneck — single thread обрабатывает всё. Не для high throughput.


Композиция: реальный pipeline

Production pipeline: разные partitioning

Postgres CDC (parallelism=1)

Postgres CDC source: parallelism = 1 (логически single-threaded — читает WAL). Это immediately bottleneck.
rebalance (round-robin)

Map + Filter (parallelism=8)

Map + Filter: разбирает CDC record, фильтрует non-relevant. parallelism = 8. Rebalance от source — каждое сообщение в следующий subtask по circular для perfect balance.
forward (1-to-1)

Enrich (parallelism=8, FORWARD)

Enrich (lookup country): добавляет country по IP через RichMap. parallelism = 8 — соответствует upstream, поэтому FORWARD. Дешевле.
hash (keyBy)

KeyedAggregate (parallelism=8, HASH by userId)

KeyedAggregate: keyBy(userId) для stateful aggregation. parallelism = 8. Hash partitioning — все события одного user в один subtask.
forward + broadcast

Kafka Sink (parallelism=8, FORWARD)

Kafka Sink: parallelism = 8 (matches partitions). Forward от KeyedAggregate.

Metrics Sink (parallelism=1, GLOBAL)

Metrics broadcast: отправляет агрегаты во все subtasks Metrics Aggregator (parallelism=1) для финального reporting. BROADCAST по small stream.

Реальная картина: в одном pipeline используются разные partitioning стратегии в зависимости от потребности на каждом шаге.


Production tips

  1. Default parallelism set explicitly: не полагайтесь на default. env.setParallelism(N) в начале job — best practice.

  2. Override parallelism на heavy операторах: если знаете, что Map дешевый, а Aggregate тяжёлый — .setParallelism() на каждом операторе индивидуально.

  3. Max parallelism set generously: env.setMaxParallelism(256) — позволяет future-scale без recompute state. Не используйте default.

  4. Rebalance после single-threaded source: КРИТИЧНО. CDC source даёт parallelism=1; без rebalance вся pipeline становится single-threaded.

  5. Не используйте rebalance перед keyBy: keyBy сам делает shuffle. Rebalance впустую перераспределит, и потом keyBy опять перераспределит.

  6. Forward chain — оптимизация: если операторы одного parallelism и нет partitioning между ними — они chain’ятся, что бесплатно. Не нарушайте chain без причины.

  7. partition.discovery.interval.ms: для KafkaSource — если в продакшене добавляете партиции, source их подхватит автоматически.


Distinguishing: rebalance vs rescale vs keyBy

Эти три partitioning часто путают. Сравним:

СтратегияDistributionSky costWhen
keyBy(k)hash(k) % NFull shuffleНужен stateful aggregation per key
rebalanceround-robinFull shuffleИдеальный balance, no key needed
rescalelocal round-robin (по possible)Local shuffleBalance, но network expensive
forward1-to-1 (same parallelism)Free (chain)Same parallelism, no partitioning

Главное правило: используйте forward (default) max часто; rebalance только когда balance важен; keyBy только когда нужна группировка по ключу.


Попробуй сам

Расширьте свой fraud detection pipeline:

  1. Поставьте explicit parallelism на каждом операторе: Source = 2, Map = 4, KeyedAggregate = 8, Sink = 4. Запустите. В Web UI — какие partitioning стрелки видите между операторами?

  2. Добавьте .rebalance() после map: что изменилось в Web UI? Где остался FORWARD, где появился REBALANCE? Performance отличается?

  3. Замените .rebalance() на .rescale(): на одной машине вы не увидите разницы, но если бы у вас был кластер из 5 машин — какая стратегия была бы дешевле?

  4. Bonus: запустите кастомного partitioner на основе hash(country): .partitionCustom(new CountryHashPartitioner(), e -> e.getCountry()). Что произойдёт, если у вас США = 80% траффика? (Это hot key, видите skew в Web UI.)

Проверка знанийKnowledge check
У вас Flink pipeline: KafkaSource (parallelism=4) -> Map (parallelism=4) -> KeyBy(userId) -> ProcessFunction (parallelism=4). KafkaSource имеет 4 партиции в Kafka. Какой partitioning между Source и Map, какой между Map и ProcessFunction, и почему важно понимать это для скейлинга?
ОтветAnswer
Между Source и Map: FORWARD (1-to-1). Одинаковый parallelism, нет explicit partitioning, поэтому Flink chain'ит их в один оператор. Каждый subtask Source отправляет напрямую в свой subtask Map, без сериализации и сетевой передачи. Между Map и ProcessFunction: HASH (после keyBy). Каждое событие отправляется в subtask 'hash(userId) % 4'. Это shuffle — события могут идти к любому subtask, включая на другом TaskManager. Для скейлинга важно: (1) parallelism Source ограничен числом Kafka партиций (4) — больше parallelism не даст больше throughput, нужно увеличить партиции в Kafka; (2) parallelism ProcessFunction (stateful) можно увеличить, но потребуется restart с savepoint и Flink перераспределит keyed state между новыми subtasks. (3) При scale-up через rescaling, keyBy переразобьёт ключи между новыми subtasks — это нормально, state migration через savepoint.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Postgres CDC source имеет parallelism = 1 (логически single-threaded). После него map с parallelism = 8. Без explicit partitioning между ними что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5