keyBy и key groups
Когда вы пишете Flink-job в DataStream API и хотите обрабатывать события “per user”, “per device” или “per order_id” — вы используете keyBy(). На первый взгляд это просто аналог GROUP BY из SQL: события группируются по ключу и попадают на один и тот же оператор. Но за этой простой операцией стоит механизм партиционирования, который определяет, как ваше состояние будет распределено между параллельными задачами и что произойдёт при изменении parallelism.
В этом уроке разбираем, что делает keyBy() под капотом, зачем нужны key groups, и как scale-операции (изменение parallelism) влияют на размещение состояния.
Что делает keyBy
keyBy() — это логическая операция перепартиционирования потока. Она не выполняет агрегацию и не накапливает состояние сама по себе. Её единственная задача — гарантировать, что все события с одним ключом попадают на одну и ту же параллельную задачу downstream-оператора.
DataStream<Order> orders = env.fromSource(kafkaSource, ...);
KeyedStream<Order, String> keyedOrders = orders
.keyBy(order -> order.getUserId());
DataStream<Alert> alerts = keyedOrders
.process(new FraudDetectionFunction());
После keyBy(userId) все заказы пользователя U-42 гарантированно попадут на одну задачу FraudDetectionFunction, независимо от того, на каком source-разделе они появились. Это фундаментальное свойство keyed state: состояние, накопленное для ключа U-42, хранится только на одной задаче, и Flink знает, на какой именно.
Python-эквивалент через PyFlink:
keyed_orders = orders.key_by(lambda order: order.user_id, key_type=Types.STRING())
alerts = keyed_orders.process(FraudDetectionFunction())
Ключевое отличие от обычного DataStream: до keyBy() вы работаете с потоком элементов, после — с потоком “групп”. Операторы downstream могут использовать ValueState, ListState, MapState — эти типы состояния доступны только после keyBy().
Хэш-партиционирование на пальцах
Как Flink решает, на какую задачу отправить событие? Простейшая модель — hash(key) % parallelism. Но эта формула катастрофически плоха для stateful-jobs: при изменении parallelism с 4 на 5 практически все ключи переедут на другие задачи, и восстановление из чекпойнта станет невозможным или потребует полной перетасовки состояния по сети.
Flink решает это через двухуровневое маппирование:
Key groups internals: детальный разбор маппирования- Hash ключа определяет key group — стабильный идентификатор группы ключей.
- Key group -> parallel subtask — назначение, которое меняется при изменении parallelism.
key "U-42"
-> hash("U-42") = 0x7A3C...
-> keyGroup = (murmurHash & MAX_VALUE) % maxParallelism = 47
-> subtask = (47 * parallelism) / maxParallelism = 2 (при parallelism=8, maxParallelism=128)
Хэш-функция — это MurmurHash (стандартный быстрый non-cryptographic hash). Деление и умножение на maxParallelism гарантирует, что ключи стабильно отображаются в один и тот же key group, а группы — равномерно в задачи.
Source
Source оператор: читает события из Kafka. Каждое событие имеет ключ (например user_id). До keyBy события распределены по source-партициям без привязки к ключу.keyBy(userId)
keyBy(userId): применяет MurmurHash к ключу, вычисляет key group (0-127), затем определяет целевую subtask. Это shuffle-операция — события пересылаются по сети между TaskManager.Subtask 0 [KG 0-31]
Subtask 0: владеет key groups 0-31 (32 группы). Все ключи, чей hash попадает в эти группы, обрабатываются здесь. Состояние для этих ключей хранится в RocksDB локально на этом TaskManager.Subtask 1 [KG 32-63]
Subtask 1: владеет key groups 32-63. user='U-42' хэшируется в KG 47 -> попадает сюда. Все следующие события с user='U-42' тоже попадут именно сюда.Subtask 2 [KG 64-95]
Subtask 2: владеет key groups 64-95.Subtask 3 [KG 96-127]
Subtask 3: владеет key groups 96-127.Что такое key groups
Key group — это атомарная единица перераспределения состояния. Все ключи в одной key group всегда живут вместе: они хранятся в одних и тех же файлах чекпойнта и при rescale переезжают как одно целое.
Параметр pipeline.max-parallelism задаёт количество key groups для всего job-а. По умолчанию Flink выбирает его автоматически (от 128 до 32768 в зависимости от начального parallelism, обычно 1.5 * parallelism округлённое до удобного значения). После запуска job-а изменить maxParallelism нельзя — это нарушит привязку ключей к group-ам и сделает восстановление из savepoint невозможным.
Распределение между subtasks вычисляется как:
subtask_for_key_group(kg) = (kg * parallelism) / maxParallelism
При parallelism=4, maxParallelism=128:
- KG 0-31 -> subtask 0
- KG 32-63 -> subtask 1
- KG 64-95 -> subtask 2
- KG 96-127 -> subtask 3
При rescale на parallelism=8:
- KG 0-15 -> subtask 0
- KG 16-31 -> subtask 1
- KG 32-47 -> subtask 2
- …
Заметьте: ключ, который был в KG 47, при parallelism=4 жил на subtask 1, а при parallelism=8 переедет на subtask 2. Но он всё равно остался в KG 47. Восстановление возможно, потому что состояние сериализуется по key groups, и при загрузке Flink просто перераспределяет группы между новыми subtasks.
maxParallelism нельзя изменить после первого checkpoint. Если вы стартуете с дефолтом 128 и потом захотите масштабироваться до 200 subtasks — упрётесь в потолок (нельзя иметь больше subtasks, чем key groups). Для production-job всегда задавайте maxParallelism явно с запасом: 720, 1440 или 2880 (числа с множеством делителей хорошо подходят).
Java и Python API
В Java keyBy() принимает KeySelector<IN, KEY> или ссылку на метод/лямбду:
// Один ключ
KeyedStream<Order, String> byUser = orders.keyBy(Order::getUserId);
// Композитный ключ через Tuple2 (тип KEY должен быть Serializable)
KeyedStream<Order, Tuple2<String, String>> byUserAndRegion =
orders.keyBy(o -> Tuple2.of(o.getUserId(), o.getRegion()),
Types.TUPLE(Types.STRING, Types.STRING));
// Композитный ключ через POJO (предпочтительный паттерн)
public static class UserRegionKey {
public String userId;
public String region;
public UserRegionKey() {}
public UserRegionKey(String u, String r) { this.userId = u; this.region = r; }
@Override public boolean equals(Object o) { /* ... */ }
@Override public int hashCode() { /* ... */ }
}
KeyedStream<Order, UserRegionKey> byUserRegion =
orders.keyBy(o -> new UserRegionKey(o.getUserId(), o.getRegion()));
В Python (PyFlink):
from pyflink.common.typeinfo import Types
# Простой ключ
by_user = orders.key_by(lambda o: o.user_id, key_type=Types.STRING())
# Композитный ключ через Row
from pyflink.common import Row
def make_key(o):
return Row(user_id=o.user_id, region=o.region)
by_user_region = orders.key_by(
make_key,
key_type=Types.ROW_NAMED(
['user_id', 'region'],
[Types.STRING(), Types.STRING()]
)
)
Тип ключа должен корректно реализовывать equals и hashCode (для POJO) или быть Tuple/Row (Flink использует встроенную сериализацию). Никогда не используйте массивы как ключ — у них нет content-based equals, и каждый вызов keyBy будет отправлять событие в случайное место.
Влияние parallelism на распределение
При rescale Flink перераспределяет key groups равномерно. Идеальный случай — maxParallelism делится на parallelism нацело: тогда каждая subtask получает ровно maxParallelism / parallelism групп.
Пример: maxParallelism=128, parallelism=4 -> каждая subtask получает 32 group-ы.
Если parallelism не делится нацело, распределение становится неравномерным:
| parallelism | KG на subtask | Перекос |
|---|---|---|
| 4 | 32 | 0% |
| 8 | 16 | 0% |
| 12 | 10-11 | 10% |
| 17 | 7-8 | 14% |
| 32 | 4 | 0% |
Перекос означает, что одна subtask может получать больше событий и хранить больше состояния, чем соседи. Для maxParallelism=128 это редко критично, но для maxParallelism=12 и parallelism=7 — катастрофа.
Практическое правило: выбирайте maxParallelism как сильно составное число (highly composite number). Хорошие варианты: 120, 240, 360, 720, 1440, 2880. Они делятся на множество значений parallelism, что даёт гибкость при масштабировании.
Data skew: проблема “горячих” ключей
keyBy() распределяет ключи равномерно по hash-у — но это работает только если ключи действительно разнообразны. Если 80% ваших событий имеют user_id = "guest", то 80% нагрузки уйдёт на одну subtask, независимо от parallelism.
Это называется data skew, и в Flink-сообществе это первая причина проблем с производительностью после неправильно настроенных watermarks.
Признаки skew:
- Одна subtask стабильно использует 95-100% CPU, остальные — 20-30%.
- Backpressure на upstream-операторе перед
keyBy. - Размер состояния на одной subtask кратно больше, чем на других (видно в Flink Web UI -> Checkpoints -> Subtasks).
Решения:
- Двухступенчатая агрегация для top-N и аналогичных задач: сначала pre-aggregate с расширенным ключом
(user_id, random_salt % 16), потом ре-агрегация по чистому user_id. - Bucketing крупных ключей: для guest-трафика разбить искусственно:
user_id = "guest-" + (hash(session_id) % 32). - MiniBatch aggregation в Table API (
table.exec.mini-batch.enabled = true) для буферизации событий и амортизации стоимости keyBy.
Skew не лечится “просто увеличить parallelism” — добавление subtask-ов перераспределяет key groups, но не меняет факта, что один ключ всегда живёт на одной задаче. Если этот ключ горячий — он останется горячим. Skew решается на уровне ключа, а не parallelism.
Что нельзя делать в keyBy
keyBy() — это не место для тяжёлой логики. Ключ вычисляется на каждое событие и должен быть:
- Дешёвым: никаких сетевых вызовов, парсинга больших JSON, регулярных выражений.
- Детерминированным: одно и то же событие должно давать один и тот же ключ при любом запуске.
- Сериализуемым: ключ переезжает по сети между TaskManager-ами.
Антипаттерны:
// ПЛОХО: ключ зависит от внешнего состояния
.keyBy(event -> {
return cache.lookup(event.id); // что если cache мутирует?
})
// ПЛОХО: ключ нестабилен между запусками
.keyBy(event -> UUID.randomUUID().toString()) // никакой группировки не будет
// ПЛОХО: композитный ключ через String concatenation
.keyBy(o -> o.userId + ":" + o.region)
// Лучше: типизированный POJO или Tuple
// ПЛОХО: тяжёлый ключ
.keyBy(event -> hashPipeline(parseProtobuf(event.payload).getNested()))
Если нужно обогащение перед группировкой — сделайте отдельный map() или RichMapFunction ДО keyBy(), добавьте поле в payload, и потом keyBy() на это поле.
Попробуй сам
-
Эксперимент с key groups. Запустите job на 4 subtask-ах с
maxParallelism = 128. В оператор послеkeyBy()добавьтеgetRuntimeContext().getIndexOfThisSubtask()в лог. Прогоните 10000 событий с уникальными ключами и подсчитайте, сколько ключей попало на каждую subtask. Ожидаемое распределение — близкое к равномерному (по 2500 на subtask). -
Rescale без потери state. На той же job-е сделайте savepoint, остановите, увеличьте parallelism до 8 и restore. Убедитесь, что суммарное количество ключей сохранилось, но распределение между subtask изменилось.
-
Найдите skew. Замените ключ на
event -> event.userId.startsWith("guest") ? "guest" : event.userId. Запустите снова и посмотрите на Web UI: одна из subtask должна получать сильно больше нагрузки. Это упражнение покажет, как выглядит data skew в Flink Web UI.