Keyed state: паттерны и анти-паттерны
Понимание API ValueState/ListState/MapState — это базис. Но реальные production-jobs строятся из узнаваемых паттернов: дедупликация, sessionization, alerting, FSM. Каждый паттерн имеет свои ловушки — и каждая ловушка хорошо известна тем, кто уже однажды сжёг 50 GB state на проде.
В этом уроке — четыре рабочих паттерна и четыре анти-паттерна, которые встречаются в большинстве code review.
Паттерн 1: дедупликация
Задача: на потоке событий с возможными дубликатами (например, at-least-once delivery из Kafka) пропускать только первое появление каждого event_id.
public class Deduplicator extends RichFilterFunction<Event> {
private transient ValueState<Boolean> seen;
@Override
public void open(OpenContext ctx) {
ValueStateDescriptor<Boolean> desc =
new ValueStateDescriptor<>("seen", Boolean.class);
desc.enableTimeToLive(StateTtlConfig
.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInRocksdbCompactFilter(1000)
.build());
seen = getRuntimeContext().getState(desc);
}
@Override
public boolean filter(Event event) throws Exception {
if (seen.value() != null) return false;
seen.update(true);
return true;
}
}
events.keyBy(Event::getEventId)
.filter(new Deduplicator());
Что важно:
keyBy(eventId)означает миллиарды ключей. Без TTL это превратится в безразмерный set.- TTL должен быть больше потенциального окна дубликатов. Если at-least-once Kafka может задержать дубликат на 1 час максимум — ставьте TTL 24 часа с запасом.
Booleanкак тип — формальность; вам не нужно значение, только сам факт ключа. Альтернатива — пустойValueState<Byte>для минимизации overhead.
Дедупликация через keyBy(eventId) — это правильно, но cardinality eventId часто сильно недооценивают. Прикиньте: 10K событий в секунду * 86400 секунд * 24 часа TTL = 20.7 миллиарда ключей в RocksDB. С учётом overhead (timestamp + framework) — терабайты state. Для high-throughput pipelines рассмотрите probabilistic дедупликацию через Bloom filter в state.
Паттерн 2: alert state с гистерезисом
Задача: оператор должен генерировать alert, когда метрика пользователя превышает порог, и НЕ слать повторные alerts, пока не было recovery.
public class ThresholdAlert extends RichFlatMapFunction<Metric, Alert> {
private transient ValueState<Boolean> alertActive;
@Override
public void open(OpenContext ctx) {
alertActive = getRuntimeContext().getState(
new ValueStateDescriptor<>("alert-active", Boolean.class));
}
@Override
public void flatMap(Metric m, Collector<Alert> out) throws Exception {
Boolean active = alertActive.value();
if (m.value > 100 && active == null) {
out.collect(new Alert(m.userId, "THRESHOLD_EXCEEDED", m.value));
alertActive.update(true);
} else if (m.value < 80 && active != null) {
out.collect(new Alert(m.userId, "RECOVERED", m.value));
alertActive.clear();
}
}
}
Это FSM с двумя состояниями: “normal” (state == null) и “alerting” (state == true). Гистерезис — разрыв между порогом срабатывания (100) и порогом recovery (80) — предотвращает flapping на пограничных значениях.
Паттерн обобщается на любые FSM. State хранит текущее состояние; transitions определяются логикой flatMap/processElement.
Для multi-state FSM используйте enum вместо boolean: ValueState<UserStatus>, где UserStatus = ACTIVE, SUSPENDED, BANNED, etc. Это делает код самодокументирующимся и предотвращает магию “null = специальное состояние”.
Паттерн 3: sessionization вручную
Session window (см. модуль про windows) автоматически группирует события в сессии по gap-у. Но иногда нужна кастомная логика: переменный gap, обрыв сессии по бизнес-событию, multi-stage сессии. В таких случаях sessionization делается через keyed state.
public class SessionTracker extends KeyedProcessFunction<String, Event, Session> {
private transient ValueState<SessionMeta> currentSession;
private transient ListState<Event> sessionEvents;
@Override
public void open(OpenContext ctx) {
currentSession = getRuntimeContext().getState(
new ValueStateDescriptor<>("session-meta", SessionMeta.class));
sessionEvents = getRuntimeContext().getListState(
new ListStateDescriptor<>("session-events", Event.class));
}
@Override
public void processElement(Event ev, Context ctx, Collector<Session> out) throws Exception {
SessionMeta meta = currentSession.value();
long gapMs = 30 * 60 * 1000;
if (meta == null) {
// Новая сессия
meta = new SessionMeta(ev.timestamp, ev.timestamp);
sessionEvents.add(ev);
currentSession.update(meta);
ctx.timerService().registerEventTimeTimer(ev.timestamp + gapMs);
} else if (ev.timestamp - meta.lastSeen > gapMs) {
// Gap превышен — закрываем старую, открываем новую
emitSession(meta, out);
sessionEvents.clear();
meta = new SessionMeta(ev.timestamp, ev.timestamp);
sessionEvents.add(ev);
currentSession.update(meta);
ctx.timerService().registerEventTimeTimer(ev.timestamp + gapMs);
} else {
// Продолжение сессии
sessionEvents.add(ev);
meta.lastSeen = ev.timestamp;
currentSession.update(meta);
ctx.timerService().registerEventTimeTimer(ev.timestamp + gapMs);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Session> out) throws Exception {
SessionMeta meta = currentSession.value();
if (meta != null && timestamp >= meta.lastSeen + 30 * 60 * 1000) {
emitSession(meta, out);
sessionEvents.clear();
currentSession.clear();
}
}
private void emitSession(SessionMeta meta, Collector<Session> out) throws Exception {
List<Event> events = new ArrayList<>();
sessionEvents.get().forEach(events::add);
out.collect(new Session(meta.start, meta.lastSeen, events));
}
}
Зачем это вместо стандартного session window?
- Контроль над дополнительной логикой “закрыть сессию”: например, при появлении события
logoutилиpayment_completed. - Возможность дробить сессии по бизнес-правилам (одна сессия = один заказ, а не gap-30min).
- Custom метаданные сессии (счётчики событий по типу, агрегаты на лету), которые не вписываются в
WindowFunction.
Паттерн 4: top-K через MapState
Задача: для каждого пользователя поддерживать top-10 самых популярных категорий.
public class TopK extends KeyedProcessFunction<String, Event, TopKReport> {
private transient MapState<String, Long> categoryCounts;
private static final int K = 10;
@Override
public void open(OpenContext ctx) {
categoryCounts = getRuntimeContext().getMapState(
new MapStateDescriptor<>("cat-counts", Types.STRING, Types.LONG));
}
@Override
public void processElement(Event ev, Context ctx, Collector<TopKReport> out) throws Exception {
Long c = categoryCounts.get(ev.category);
categoryCounts.put(ev.category, (c == null ? 0 : c) + 1);
// Раз в N событий — пересчёт top-K
if (ev.timestamp % 100 == 0) {
PriorityQueue<Map.Entry<String, Long>> heap = new PriorityQueue<>(
(a, b) -> Long.compare(a.getValue(), b.getValue())
);
for (Map.Entry<String, Long> entry : categoryCounts.entries()) {
heap.offer(entry);
if (heap.size() > K) heap.poll();
}
List<Map.Entry<String, Long>> topK = new ArrayList<>(heap);
topK.sort((a, b) -> Long.compare(b.getValue(), a.getValue()));
out.collect(new TopKReport(ev.userId, topK));
}
}
}
Тонкости:
MapState.entries()— итерация по всему словарю, O(N) от размера мапы. Не делайте на каждом событии, если категорий тысячи.- Для high-cardinality (миллион категорий) heap-based top-K превращается в bottleneck. Альтернатива: храните top-K explicit в отдельном ValueState
<TreeMap>и обновляйте инкрементально (но это сложнее). - При rescale всё state поедет на новый subtask “цельным куском” — гарантируется консистентность top-K.
Анти-паттерн 1: unbounded GROUP BY
events.keyBy(e -> e.transactionId) // никогда не повторяется
.reduce(...)
transaction_id уникален per event, и keyBy(transactionId) создаёт ключ на каждое событие. После 1 миллиарда событий — 1 миллиард keys в state, без возможности reuse.
Признак того, что вы попали в эту ловушку: state monotonically растёт, и в Web UI у вас по 10K-100K новых keys в минуту. Если cardinality ключа теоретически unbounded, нужно либо TTL, либо радикальный пересмотр архитектуры:
- Дедупликация — окей с TTL.
- Аналитика per транзакция — обычно НЕ нужно state, можно просто
flatMapбез keyBy. - Сессионизация — keyBy(user_id), а transaction_id внутри payload.
Анти-паттерн 2: big state per key
public class History extends RichFlatMapFunction<Event, Anomaly> {
private transient ValueState<List<Event>> fullHistory; // ВСЯ история пользователя!
@Override
public void flatMap(Event e, Collector<Anomaly> out) throws Exception {
List<Event> hist = fullHistory.value();
if (hist == null) hist = new ArrayList<>();
hist.add(e);
fullHistory.update(hist);
// ... анализ всей истории
}
}
Каждое событие — это read + append + write всей истории целиком. При истории в 10000 событий каждое новое событие читает и пишет ~10000 объектов в RocksDB. CPU и I/O пропорциональны размеру state, не количеству новых событий.
Правильно:
- Если нужно “последние N” — используйте sliding window или явный circular buffer (ListState с обрезкой при достижении N).
- Если нужны агрегаты — храните агрегаты, а не raw данные (mean, variance, exponential moving average).
- Если нужен random access по items — MapState вместо ValueState
<List>.
Анти-паттерн 3: hot key
events.keyBy(e -> e.userId.startsWith("guest") ? "guest" : e.userId)
Все гостевые сессии попадают на одну задачу. State для ключа "guest" будет огромным, CPU subtask-а 0 — на 100%, остальные простаивают.
Решение — двухступенчатая агрегация:
// Этап 1: дробим guest на 32 виртуальных ключа
events
.keyBy(e -> e.userId.startsWith("guest")
? "guest-" + (Math.abs(e.sessionId.hashCode()) % 32)
: e.userId)
.timeWindow(Time.minutes(1))
.reduce(...)
// Этап 2: собираем guest-* обратно в один
.keyBy(metric -> metric.userId.startsWith("guest-") ? "guest" : metric.userId)
.reduce(...)
Этот паттерн распространён в Spark, Flink, Kafka Streams под названием salt + reduce.
State stores в Kafka Streams: аналогичный keyed stateАнти-паттерн 4: state без watermark/cleanup в Process Function
public class PendingOrders extends KeyedProcessFunction<String, Order, ...> {
private transient MapState<String, Order> pending; // orderId -> Order
@Override
public void processElement(Order o, Context ctx, Collector<...> out) throws Exception {
if (o.status == OrderStatus.PENDING) {
pending.put(o.orderId, o);
} else if (o.status == OrderStatus.COMPLETED) {
pending.remove(o.orderId);
}
// А если event COMPLETED никогда не приходит? Order висит в state вечно.
}
}
Если бизнес-событие, “закрывающее” state, может быть потеряно, missed или просто не сгенерировано — state не очистится. Эта проблема не решается TTL: TTL обновляется при каждом put, и orderId продолжает “жить” с каждым новым event для того же user.
Правильно: всегда регистрируйте watchdog-timer на момент создания PENDING:
if (o.status == OrderStatus.PENDING) {
pending.put(o.orderId, o);
long deadline = ctx.timerService().currentProcessingTime() + Duration.ofHours(24).toMillis();
ctx.timerService().registerProcessingTimeTimer(deadline);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, ...) throws Exception {
// Стерили все pending, чей deadline истёк
Iterator<Map.Entry<String, Order>> it = pending.entries().iterator();
while (it.hasNext()) {
Map.Entry<String, Order> e = it.next();
if (timestamp - e.getValue().createdAt >= Duration.ofHours(24).toMillis()) {
it.remove();
}
}
}
Размер state: чек-лист перед production
- Cardinality ключа. Сколько уникальных значений ключа ожидается за 1 час, 1 день, 30 дней? Если ответ “может быть unbounded” — нужно TTL или явный cleanup.
- Размер state per key. Грубая оценка: средний размер value * количество ключей = total state. Если получается > 100 GB на subtask — пора пересматривать архитектуру (sharding, downsampling).
- TTL. Каждый keyBy с длинным lifetime должен иметь явный TTL. Хорошее значение для unbounded scenarios — 7-30 дней с запасом.
- Schema evolution. Если value будет меняться в будущем — Avro/Protobuf, не raw POJO. См. State Processor API для migration.
- Backend. Heap — для малого state (
< 1 GB). RocksDB — для всего остального.
Самая дорогая ошибка проектирования Flink-job — это решение использовать keyed state с unbounded cardinality без cleanup. Это видно не сразу: job стабильно работает неделю, потом две, потом начинает медленно деградировать, и через месяц вы получаете OOM в 3 утра. Лучше переподумать архитектуру в начале, чем потом героически чинить.
Попробуй сам
-
Деduplication scale test. Запустите дедупликатор на 100K событий/секунду с уникальными event_id. Без TTL — наблюдайте за ростом state. С TTL 5 минут — убедитесь, что state стабилизируется.
-
Hot key detection. Сгенерируйте поток, где 50% событий имеют user_id = “popular”. Запустите простой keyBy(userId) с counter. В Web UI посмотрите на subtask-метрики — должна быть видна сильно загруженная одна задача. Примените salt-and-reduce паттерн и сравните.
-
Sessionization vs Session Window. Реализуйте sessionization вручную через keyed state (см. паттерн 3) и через стандартный SessionWindow. Сравните: какой подход легче дополнить кастомной логикой “закрыть сессию при logout-событии”.