Learning Platform
Глоссарий Troubleshooting
Урок 05.01 · 18 мин
Средний
KeyedStreamkeyByKey GroupsParallelismState Partitioning

keyBy и key groups

Когда вы пишете Flink-job в DataStream API и хотите обрабатывать события “per user”, “per device” или “per order_id” — вы используете keyBy(). На первый взгляд это просто аналог GROUP BY из SQL: события группируются по ключу и попадают на один и тот же оператор. Но за этой простой операцией стоит механизм партиционирования, который определяет, как ваше состояние будет распределено между параллельными задачами и что произойдёт при изменении parallelism.

В этом уроке разбираем, что делает keyBy() под капотом, зачем нужны key groups, и как scale-операции (изменение parallelism) влияют на размещение состояния.


Что делает keyBy

keyBy() — это логическая операция перепартиционирования потока. Она не выполняет агрегацию и не накапливает состояние сама по себе. Её единственная задача — гарантировать, что все события с одним ключом попадают на одну и ту же параллельную задачу downstream-оператора.

DataStream<Order> orders = env.fromSource(kafkaSource, ...);

KeyedStream<Order, String> keyedOrders = orders
    .keyBy(order -> order.getUserId());

DataStream<Alert> alerts = keyedOrders
    .process(new FraudDetectionFunction());

После keyBy(userId) все заказы пользователя U-42 гарантированно попадут на одну задачу FraudDetectionFunction, независимо от того, на каком source-разделе они появились. Это фундаментальное свойство keyed state: состояние, накопленное для ключа U-42, хранится только на одной задаче, и Flink знает, на какой именно.

Python-эквивалент через PyFlink:

keyed_orders = orders.key_by(lambda order: order.user_id, key_type=Types.STRING())

alerts = keyed_orders.process(FraudDetectionFunction())

Ключевое отличие от обычного DataStream: до keyBy() вы работаете с потоком элементов, после — с потоком “групп”. Операторы downstream могут использовать ValueState, ListState, MapState — эти типы состояния доступны только после keyBy().


Хэш-партиционирование на пальцах

Как Flink решает, на какую задачу отправить событие? Простейшая модель — hash(key) % parallelism. Но эта формула катастрофически плоха для stateful-jobs: при изменении parallelism с 4 на 5 практически все ключи переедут на другие задачи, и восстановление из чекпойнта станет невозможным или потребует полной перетасовки состояния по сети.

Flink решает это через двухуровневое маппирование:

Key groups internals: детальный разбор маппирования
  1. Hash ключа определяет key group — стабильный идентификатор группы ключей.
  2. Key group -> parallel subtask — назначение, которое меняется при изменении parallelism.
key "U-42"
  -> hash("U-42") = 0x7A3C...
    -> keyGroup = (murmurHash & MAX_VALUE) % maxParallelism = 47
      -> subtask = (47 * parallelism) / maxParallelism = 2 (при parallelism=8, maxParallelism=128)

Хэш-функция — это MurmurHash (стандартный быстрый non-cryptographic hash). Деление и умножение на maxParallelism гарантирует, что ключи стабильно отображаются в один и тот же key group, а группы — равномерно в задачи.

keyBy: ключи -> key groups -> параллельные задачи
parallelism=4, maxParallelism=128

Source

Source оператор: читает события из Kafka. Каждое событие имеет ключ (например user_id). До keyBy события распределены по source-партициям без привязки к ключу.
Order { user: U-42 }

keyBy(userId)

keyBy(userId): применяет MurmurHash к ключу, вычисляет key group (0-127), затем определяет целевую subtask. Это shuffle-операция — события пересылаются по сети между TaskManager.
128 key groups равномерно распределены между 4 subtasks

Subtask 0 [KG 0-31]

Subtask 0: владеет key groups 0-31 (32 группы). Все ключи, чей hash попадает в эти группы, обрабатываются здесь. Состояние для этих ключей хранится в RocksDB локально на этом TaskManager.

Subtask 1 [KG 32-63]

Subtask 1: владеет key groups 32-63. user='U-42' хэшируется в KG 47 -> попадает сюда. Все следующие события с user='U-42' тоже попадут именно сюда.

Subtask 2 [KG 64-95]

Subtask 2: владеет key groups 64-95.

Subtask 3 [KG 96-127]

Subtask 3: владеет key groups 96-127.

Что такое key groups

Key group — это атомарная единица перераспределения состояния. Все ключи в одной key group всегда живут вместе: они хранятся в одних и тех же файлах чекпойнта и при rescale переезжают как одно целое.

Параметр pipeline.max-parallelism задаёт количество key groups для всего job-а. По умолчанию Flink выбирает его автоматически (от 128 до 32768 в зависимости от начального parallelism, обычно 1.5 * parallelism округлённое до удобного значения). После запуска job-а изменить maxParallelism нельзя — это нарушит привязку ключей к group-ам и сделает восстановление из savepoint невозможным.

Распределение между subtasks вычисляется как:

subtask_for_key_group(kg) = (kg * parallelism) / maxParallelism

При parallelism=4, maxParallelism=128:

  • KG 0-31 -> subtask 0
  • KG 32-63 -> subtask 1
  • KG 64-95 -> subtask 2
  • KG 96-127 -> subtask 3

При rescale на parallelism=8:

  • KG 0-15 -> subtask 0
  • KG 16-31 -> subtask 1
  • KG 32-47 -> subtask 2

Заметьте: ключ, который был в KG 47, при parallelism=4 жил на subtask 1, а при parallelism=8 переедет на subtask 2. Но он всё равно остался в KG 47. Восстановление возможно, потому что состояние сериализуется по key groups, и при загрузке Flink просто перераспределяет группы между новыми subtasks.

WARNING

maxParallelism нельзя изменить после первого checkpoint. Если вы стартуете с дефолтом 128 и потом захотите масштабироваться до 200 subtasks — упрётесь в потолок (нельзя иметь больше subtasks, чем key groups). Для production-job всегда задавайте maxParallelism явно с запасом: 720, 1440 или 2880 (числа с множеством делителей хорошо подходят).


Java и Python API

В Java keyBy() принимает KeySelector<IN, KEY> или ссылку на метод/лямбду:

// Один ключ
KeyedStream<Order, String> byUser = orders.keyBy(Order::getUserId);

// Композитный ключ через Tuple2 (тип KEY должен быть Serializable)
KeyedStream<Order, Tuple2<String, String>> byUserAndRegion =
    orders.keyBy(o -> Tuple2.of(o.getUserId(), o.getRegion()),
                 Types.TUPLE(Types.STRING, Types.STRING));

// Композитный ключ через POJO (предпочтительный паттерн)
public static class UserRegionKey {
    public String userId;
    public String region;
    public UserRegionKey() {}
    public UserRegionKey(String u, String r) { this.userId = u; this.region = r; }
    @Override public boolean equals(Object o) { /* ... */ }
    @Override public int hashCode() { /* ... */ }
}

KeyedStream<Order, UserRegionKey> byUserRegion =
    orders.keyBy(o -> new UserRegionKey(o.getUserId(), o.getRegion()));

В Python (PyFlink):

from pyflink.common.typeinfo import Types

# Простой ключ
by_user = orders.key_by(lambda o: o.user_id, key_type=Types.STRING())

# Композитный ключ через Row
from pyflink.common import Row

def make_key(o):
    return Row(user_id=o.user_id, region=o.region)

by_user_region = orders.key_by(
    make_key,
    key_type=Types.ROW_NAMED(
        ['user_id', 'region'],
        [Types.STRING(), Types.STRING()]
    )
)
TIP

Тип ключа должен корректно реализовывать equals и hashCode (для POJO) или быть Tuple/Row (Flink использует встроенную сериализацию). Никогда не используйте массивы как ключ — у них нет content-based equals, и каждый вызов keyBy будет отправлять событие в случайное место.


Влияние parallelism на распределение

При rescale Flink перераспределяет key groups равномерно. Идеальный случай — maxParallelism делится на parallelism нацело: тогда каждая subtask получает ровно maxParallelism / parallelism групп.

Пример: maxParallelism=128, parallelism=4 -> каждая subtask получает 32 group-ы.

Если parallelism не делится нацело, распределение становится неравномерным:

parallelismKG на subtaskПерекос
4320%
8160%
1210-1110%
177-814%
3240%

Перекос означает, что одна subtask может получать больше событий и хранить больше состояния, чем соседи. Для maxParallelism=128 это редко критично, но для maxParallelism=12 и parallelism=7 — катастрофа.

Практическое правило: выбирайте maxParallelism как сильно составное число (highly composite number). Хорошие варианты: 120, 240, 360, 720, 1440, 2880. Они делятся на множество значений parallelism, что даёт гибкость при масштабировании.


Data skew: проблема “горячих” ключей

keyBy() распределяет ключи равномерно по hash-у — но это работает только если ключи действительно разнообразны. Если 80% ваших событий имеют user_id = "guest", то 80% нагрузки уйдёт на одну subtask, независимо от parallelism.

Это называется data skew, и в Flink-сообществе это первая причина проблем с производительностью после неправильно настроенных watermarks.

Признаки skew:

  • Одна subtask стабильно использует 95-100% CPU, остальные — 20-30%.
  • Backpressure на upstream-операторе перед keyBy.
  • Размер состояния на одной subtask кратно больше, чем на других (видно в Flink Web UI -> Checkpoints -> Subtasks).

Решения:

  • Двухступенчатая агрегация для top-N и аналогичных задач: сначала pre-aggregate с расширенным ключом (user_id, random_salt % 16), потом ре-агрегация по чистому user_id.
  • Bucketing крупных ключей: для guest-трафика разбить искусственно: user_id = "guest-" + (hash(session_id) % 32).
  • MiniBatch aggregation в Table API (table.exec.mini-batch.enabled = true) для буферизации событий и амортизации стоимости keyBy.
NOTE

Skew не лечится “просто увеличить parallelism” — добавление subtask-ов перераспределяет key groups, но не меняет факта, что один ключ всегда живёт на одной задаче. Если этот ключ горячий — он останется горячим. Skew решается на уровне ключа, а не parallelism.


Что нельзя делать в keyBy

keyBy() — это не место для тяжёлой логики. Ключ вычисляется на каждое событие и должен быть:

  • Дешёвым: никаких сетевых вызовов, парсинга больших JSON, регулярных выражений.
  • Детерминированным: одно и то же событие должно давать один и тот же ключ при любом запуске.
  • Сериализуемым: ключ переезжает по сети между TaskManager-ами.

Антипаттерны:

// ПЛОХО: ключ зависит от внешнего состояния
.keyBy(event -> {
    return cache.lookup(event.id);  // что если cache мутирует?
})

// ПЛОХО: ключ нестабилен между запусками
.keyBy(event -> UUID.randomUUID().toString())  // никакой группировки не будет

// ПЛОХО: композитный ключ через String concatenation
.keyBy(o -> o.userId + ":" + o.region)
// Лучше: типизированный POJO или Tuple

// ПЛОХО: тяжёлый ключ
.keyBy(event -> hashPipeline(parseProtobuf(event.payload).getNested()))

Если нужно обогащение перед группировкой — сделайте отдельный map() или RichMapFunction ДО keyBy(), добавьте поле в payload, и потом keyBy() на это поле.


Попробуй сам

  1. Эксперимент с key groups. Запустите job на 4 subtask-ах с maxParallelism = 128. В оператор после keyBy() добавьте getRuntimeContext().getIndexOfThisSubtask() в лог. Прогоните 10000 событий с уникальными ключами и подсчитайте, сколько ключей попало на каждую subtask. Ожидаемое распределение — близкое к равномерному (по 2500 на subtask).

  2. Rescale без потери state. На той же job-е сделайте savepoint, остановите, увеличьте parallelism до 8 и restore. Убедитесь, что суммарное количество ключей сохранилось, но распределение между subtask изменилось.

  3. Найдите skew. Замените ключ на event -> event.userId.startsWith("guest") ? "guest" : event.userId. Запустите снова и посмотрите на Web UI: одна из subtask должна получать сильно больше нагрузки. Это упражнение покажет, как выглядит data skew в Flink Web UI.

Проверка знанийKnowledge check
Вы запустили Flink-job с parallelism=4 и maxParallelism=128, накопили час состояния, сделали savepoint и хотите масштабироваться до parallelism=200. Что произойдёт?
ОтветAnswer
Восстановление из savepoint завершится с ошибкой. Parallelism не может превышать maxParallelism — это потолок, заданный при первом запуске и записанный в каждый чекпойнт/savepoint. Ваши 200 subtasks не имеют места для размещения: всего 128 key groups, а одна группа не может быть разделена между двумя subtask. Решение на будущее: всегда задавайте pipeline.max-parallelism явно с запасом (например 720 или 1440). Решение для текущей ситуации: либо смириться с потолком 128, либо запустить новый job с большим maxParallelism и переиграть данные с начала (или использовать state processor API для миграции состояния, что нетривиально).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Job был запущен с parallelism=4 и без явного указания pipeline.max-parallelism. После 2 недель работы команда хочет масштабироваться до parallelism=300. Что произойдёт при restore из последнего savepoint?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4