Learning Platform
Глоссарий Troubleshooting
Урок 05.04 · 22 мин
Средний
State PatternsSessionizationDeduplicationAlert StateAnti-PatternsCardinality

Keyed state: паттерны и анти-паттерны

Понимание API ValueState/ListState/MapState — это базис. Но реальные production-jobs строятся из узнаваемых паттернов: дедупликация, sessionization, alerting, FSM. Каждый паттерн имеет свои ловушки — и каждая ловушка хорошо известна тем, кто уже однажды сжёг 50 GB state на проде.

В этом уроке — четыре рабочих паттерна и четыре анти-паттерна, которые встречаются в большинстве code review.


Паттерн 1: дедупликация

Задача: на потоке событий с возможными дубликатами (например, at-least-once delivery из Kafka) пропускать только первое появление каждого event_id.

public class Deduplicator extends RichFilterFunction<Event> {
    private transient ValueState<Boolean> seen;

    @Override
    public void open(OpenContext ctx) {
        ValueStateDescriptor<Boolean> desc =
            new ValueStateDescriptor<>("seen", Boolean.class);
        desc.enableTimeToLive(StateTtlConfig
            .newBuilder(Time.hours(24))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .cleanupInRocksdbCompactFilter(1000)
            .build());
        seen = getRuntimeContext().getState(desc);
    }

    @Override
    public boolean filter(Event event) throws Exception {
        if (seen.value() != null) return false;
        seen.update(true);
        return true;
    }
}

events.keyBy(Event::getEventId)
      .filter(new Deduplicator());

Что важно:

  • keyBy(eventId) означает миллиарды ключей. Без TTL это превратится в безразмерный set.
  • TTL должен быть больше потенциального окна дубликатов. Если at-least-once Kafka может задержать дубликат на 1 час максимум — ставьте TTL 24 часа с запасом.
  • Boolean как тип — формальность; вам не нужно значение, только сам факт ключа. Альтернатива — пустой ValueState<Byte> для минимизации overhead.
WARNING

Дедупликация через keyBy(eventId) — это правильно, но cardinality eventId часто сильно недооценивают. Прикиньте: 10K событий в секунду * 86400 секунд * 24 часа TTL = 20.7 миллиарда ключей в RocksDB. С учётом overhead (timestamp + framework) — терабайты state. Для high-throughput pipelines рассмотрите probabilistic дедупликацию через Bloom filter в state.


Паттерн 2: alert state с гистерезисом

Задача: оператор должен генерировать alert, когда метрика пользователя превышает порог, и НЕ слать повторные alerts, пока не было recovery.

public class ThresholdAlert extends RichFlatMapFunction<Metric, Alert> {
    private transient ValueState<Boolean> alertActive;

    @Override
    public void open(OpenContext ctx) {
        alertActive = getRuntimeContext().getState(
            new ValueStateDescriptor<>("alert-active", Boolean.class));
    }

    @Override
    public void flatMap(Metric m, Collector<Alert> out) throws Exception {
        Boolean active = alertActive.value();
        if (m.value > 100 && active == null) {
            out.collect(new Alert(m.userId, "THRESHOLD_EXCEEDED", m.value));
            alertActive.update(true);
        } else if (m.value < 80 && active != null) {
            out.collect(new Alert(m.userId, "RECOVERED", m.value));
            alertActive.clear();
        }
    }
}

Это FSM с двумя состояниями: “normal” (state == null) и “alerting” (state == true). Гистерезис — разрыв между порогом срабатывания (100) и порогом recovery (80) — предотвращает flapping на пограничных значениях.

Паттерн обобщается на любые FSM. State хранит текущее состояние; transitions определяются логикой flatMap/processElement.

TIP

Для multi-state FSM используйте enum вместо boolean: ValueState<UserStatus>, где UserStatus = ACTIVE, SUSPENDED, BANNED, etc. Это делает код самодокументирующимся и предотвращает магию “null = специальное состояние”.


Паттерн 3: sessionization вручную

Session window (см. модуль про windows) автоматически группирует события в сессии по gap-у. Но иногда нужна кастомная логика: переменный gap, обрыв сессии по бизнес-событию, multi-stage сессии. В таких случаях sessionization делается через keyed state.

public class SessionTracker extends KeyedProcessFunction<String, Event, Session> {
    private transient ValueState<SessionMeta> currentSession;
    private transient ListState<Event> sessionEvents;

    @Override
    public void open(OpenContext ctx) {
        currentSession = getRuntimeContext().getState(
            new ValueStateDescriptor<>("session-meta", SessionMeta.class));
        sessionEvents = getRuntimeContext().getListState(
            new ListStateDescriptor<>("session-events", Event.class));
    }

    @Override
    public void processElement(Event ev, Context ctx, Collector<Session> out) throws Exception {
        SessionMeta meta = currentSession.value();
        long gapMs = 30 * 60 * 1000;

        if (meta == null) {
            // Новая сессия
            meta = new SessionMeta(ev.timestamp, ev.timestamp);
            sessionEvents.add(ev);
            currentSession.update(meta);
            ctx.timerService().registerEventTimeTimer(ev.timestamp + gapMs);
        } else if (ev.timestamp - meta.lastSeen > gapMs) {
            // Gap превышен — закрываем старую, открываем новую
            emitSession(meta, out);
            sessionEvents.clear();
            meta = new SessionMeta(ev.timestamp, ev.timestamp);
            sessionEvents.add(ev);
            currentSession.update(meta);
            ctx.timerService().registerEventTimeTimer(ev.timestamp + gapMs);
        } else {
            // Продолжение сессии
            sessionEvents.add(ev);
            meta.lastSeen = ev.timestamp;
            currentSession.update(meta);
            ctx.timerService().registerEventTimeTimer(ev.timestamp + gapMs);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Session> out) throws Exception {
        SessionMeta meta = currentSession.value();
        if (meta != null && timestamp >= meta.lastSeen + 30 * 60 * 1000) {
            emitSession(meta, out);
            sessionEvents.clear();
            currentSession.clear();
        }
    }

    private void emitSession(SessionMeta meta, Collector<Session> out) throws Exception {
        List<Event> events = new ArrayList<>();
        sessionEvents.get().forEach(events::add);
        out.collect(new Session(meta.start, meta.lastSeen, events));
    }
}

Зачем это вместо стандартного session window?

  • Контроль над дополнительной логикой “закрыть сессию”: например, при появлении события logout или payment_completed.
  • Возможность дробить сессии по бизнес-правилам (одна сессия = один заказ, а не gap-30min).
  • Custom метаданные сессии (счётчики событий по типу, агрегаты на лету), которые не вписываются в WindowFunction.

Паттерн 4: top-K через MapState

Задача: для каждого пользователя поддерживать top-10 самых популярных категорий.

public class TopK extends KeyedProcessFunction<String, Event, TopKReport> {
    private transient MapState<String, Long> categoryCounts;
    private static final int K = 10;

    @Override
    public void open(OpenContext ctx) {
        categoryCounts = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("cat-counts", Types.STRING, Types.LONG));
    }

    @Override
    public void processElement(Event ev, Context ctx, Collector<TopKReport> out) throws Exception {
        Long c = categoryCounts.get(ev.category);
        categoryCounts.put(ev.category, (c == null ? 0 : c) + 1);

        // Раз в N событий — пересчёт top-K
        if (ev.timestamp % 100 == 0) {
            PriorityQueue<Map.Entry<String, Long>> heap = new PriorityQueue<>(
                (a, b) -> Long.compare(a.getValue(), b.getValue())
            );
            for (Map.Entry<String, Long> entry : categoryCounts.entries()) {
                heap.offer(entry);
                if (heap.size() > K) heap.poll();
            }
            List<Map.Entry<String, Long>> topK = new ArrayList<>(heap);
            topK.sort((a, b) -> Long.compare(b.getValue(), a.getValue()));
            out.collect(new TopKReport(ev.userId, topK));
        }
    }
}

Тонкости:

  • MapState.entries() — итерация по всему словарю, O(N) от размера мапы. Не делайте на каждом событии, если категорий тысячи.
  • Для high-cardinality (миллион категорий) heap-based top-K превращается в bottleneck. Альтернатива: храните top-K explicit в отдельном ValueState<TreeMap> и обновляйте инкрементально (но это сложнее).
  • При rescale всё state поедет на новый subtask “цельным куском” — гарантируется консистентность top-K.

Анти-паттерн 1: unbounded GROUP BY

events.keyBy(e -> e.transactionId)  // никогда не повторяется
      .reduce(...)

transaction_id уникален per event, и keyBy(transactionId) создаёт ключ на каждое событие. После 1 миллиарда событий — 1 миллиард keys в state, без возможности reuse.

Признак того, что вы попали в эту ловушку: state monotonically растёт, и в Web UI у вас по 10K-100K новых keys в минуту. Если cardinality ключа теоретически unbounded, нужно либо TTL, либо радикальный пересмотр архитектуры:

  • Дедупликация — окей с TTL.
  • Аналитика per транзакция — обычно НЕ нужно state, можно просто flatMap без keyBy.
  • Сессионизация — keyBy(user_id), а transaction_id внутри payload.

Анти-паттерн 2: big state per key

public class History extends RichFlatMapFunction<Event, Anomaly> {
    private transient ValueState<List<Event>> fullHistory;  // ВСЯ история пользователя!

    @Override
    public void flatMap(Event e, Collector<Anomaly> out) throws Exception {
        List<Event> hist = fullHistory.value();
        if (hist == null) hist = new ArrayList<>();
        hist.add(e);
        fullHistory.update(hist);
        // ... анализ всей истории
    }
}

Каждое событие — это read + append + write всей истории целиком. При истории в 10000 событий каждое новое событие читает и пишет ~10000 объектов в RocksDB. CPU и I/O пропорциональны размеру state, не количеству новых событий.

Правильно:

  • Если нужно “последние N” — используйте sliding window или явный circular buffer (ListState с обрезкой при достижении N).
  • Если нужны агрегаты — храните агрегаты, а не raw данные (mean, variance, exponential moving average).
  • Если нужен random access по items — MapState вместо ValueState<List>.
Анти-паттерн: big state per key
ValueState`<List<Event>>` — Flink сериализует весь список при каждом обращении. Для RocksDB это означает прочитать blob, десериализовать, изменить, сериализовать обратно — на каждое событие.
vs
MapState`<Long, Event>` — каждый event хранится отдельной парой. Добавление O(1) на RocksDB, чтение конкретного event O(log) по индексу.

Анти-паттерн 3: hot key

events.keyBy(e -> e.userId.startsWith("guest") ? "guest" : e.userId)

Все гостевые сессии попадают на одну задачу. State для ключа "guest" будет огромным, CPU subtask-а 0 — на 100%, остальные простаивают.

Решение — двухступенчатая агрегация:

// Этап 1: дробим guest на 32 виртуальных ключа
events
    .keyBy(e -> e.userId.startsWith("guest")
        ? "guest-" + (Math.abs(e.sessionId.hashCode()) % 32)
        : e.userId)
    .timeWindow(Time.minutes(1))
    .reduce(...)
    // Этап 2: собираем guest-* обратно в один
    .keyBy(metric -> metric.userId.startsWith("guest-") ? "guest" : metric.userId)
    .reduce(...)

Этот паттерн распространён в Spark, Flink, Kafka Streams под названием salt + reduce.

State stores в Kafka Streams: аналогичный keyed state

Анти-паттерн 4: state без watermark/cleanup в Process Function

public class PendingOrders extends KeyedProcessFunction<String, Order, ...> {
    private transient MapState<String, Order> pending;  // orderId -> Order

    @Override
    public void processElement(Order o, Context ctx, Collector<...> out) throws Exception {
        if (o.status == OrderStatus.PENDING) {
            pending.put(o.orderId, o);
        } else if (o.status == OrderStatus.COMPLETED) {
            pending.remove(o.orderId);
        }
        // А если event COMPLETED никогда не приходит? Order висит в state вечно.
    }
}

Если бизнес-событие, “закрывающее” state, может быть потеряно, missed или просто не сгенерировано — state не очистится. Эта проблема не решается TTL: TTL обновляется при каждом put, и orderId продолжает “жить” с каждым новым event для того же user.

Правильно: всегда регистрируйте watchdog-timer на момент создания PENDING:

if (o.status == OrderStatus.PENDING) {
    pending.put(o.orderId, o);
    long deadline = ctx.timerService().currentProcessingTime() + Duration.ofHours(24).toMillis();
    ctx.timerService().registerProcessingTimeTimer(deadline);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, ...) throws Exception {
    // Стерили все pending, чей deadline истёк
    Iterator<Map.Entry<String, Order>> it = pending.entries().iterator();
    while (it.hasNext()) {
        Map.Entry<String, Order> e = it.next();
        if (timestamp - e.getValue().createdAt >= Duration.ofHours(24).toMillis()) {
            it.remove();
        }
    }
}

Размер state: чек-лист перед production

  • Cardinality ключа. Сколько уникальных значений ключа ожидается за 1 час, 1 день, 30 дней? Если ответ “может быть unbounded” — нужно TTL или явный cleanup.
  • Размер state per key. Грубая оценка: средний размер value * количество ключей = total state. Если получается > 100 GB на subtask — пора пересматривать архитектуру (sharding, downsampling).
  • TTL. Каждый keyBy с длинным lifetime должен иметь явный TTL. Хорошее значение для unbounded scenarios — 7-30 дней с запасом.
  • Schema evolution. Если value будет меняться в будущем — Avro/Protobuf, не raw POJO. См. State Processor API для migration.
  • Backend. Heap — для малого state (< 1 GB). RocksDB — для всего остального.
WARNING

Самая дорогая ошибка проектирования Flink-job — это решение использовать keyed state с unbounded cardinality без cleanup. Это видно не сразу: job стабильно работает неделю, потом две, потом начинает медленно деградировать, и через месяц вы получаете OOM в 3 утра. Лучше переподумать архитектуру в начале, чем потом героически чинить.


Попробуй сам

  1. Деduplication scale test. Запустите дедупликатор на 100K событий/секунду с уникальными event_id. Без TTL — наблюдайте за ростом state. С TTL 5 минут — убедитесь, что state стабилизируется.

  2. Hot key detection. Сгенерируйте поток, где 50% событий имеют user_id = “popular”. Запустите простой keyBy(userId) с counter. В Web UI посмотрите на subtask-метрики — должна быть видна сильно загруженная одна задача. Примените salt-and-reduce паттерн и сравните.

  3. Sessionization vs Session Window. Реализуйте sessionization вручную через keyed state (см. паттерн 3) и через стандартный SessionWindow. Сравните: какой подход легче дополнить кастомной логикой “закрыть сессию при logout-событии”.

Проверка знанийKnowledge check
У вас job, который для каждого user_id хранит MapState<orderId, OrderDetails>. Orders добавляются как PENDING, потом удаляются при появлении event-а с тем же orderId и status=COMPLETED. Несколько недель всё работало, потом state начал монотонно расти. В чём вероятная проблема и как починить?
ОтветAnswer
Проблема: для части orderId события со status=COMPLETED никогда не приходят (баг в upstream, потерянное сообщение, удалённый продавец, частичный failure). Эти orders остаются в state навсегда, поскольку никакой логики их очистки нет, и TTL на MapState всегда сбрасывается при любом put для того же user_id. Решение: при создании PENDING сразу регистрировать processing-time или event-time timer на N часов вперёд (например 48 часов как maximum business deadline). В onTimer проверять каждый order — если по нему до сих пор статус PENDING и timestamp превышает порог, удалять через mapState.remove(orderId) и опционально отправлять в side output для алертинга бизнес-команды. Это превращает потенциально бесконечный state в state с гарантированной верхней границей по времени жизни записи.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Команда написала Flink-job, который для каждого user_id хранит полную историю событий в ValueState<List<Event>> и при каждом новом событии анализирует всю историю. После 2 недель работы latency обработки одного события выросла с 1ms до 800ms. Какова основная причина?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4