Learning Platform
Глоссарий Troubleshooting
Урок 06.02 · 18 мин
Средний
Session WindowsGap-BasedSession MergingDynamic GapUser Sessions

Session windows

В отличие от tumbling и sliding окон, session windows не имеют фиксированной длительности. Они формируются на основе gap-а активности: пока события приходят друг за другом достаточно часто, они принадлежат одной сессии.

Sessionization в Kafka Streams Как только наступает “тишина” длиннее gap-а — сессия закрывается.

Этот тип окон близок к интуитивному пониманию “сессия пользователя”: активность на сайте делится на естественные периоды взаимодействия, разделённые паузами на еду, сон или переключение задач.


Базовое использование

DataStream<UserEvent> events = ...;

DataStream<Session> sessions = events
    .keyBy(UserEvent::getUserId)
    .window(EventTimeSessionWindows.withGap(Duration.ofMinutes(30)))
    .aggregate(new SessionAggregator());

Каждый event с одним user_id попадает в session window. Окно растёт по мере поступления новых событий и закрывается, когда watermark проходит за момент last_event + gap.

В Python:

from pyflink.datastream.window import EventTimeSessionWindows
from pyflink.common import Duration

sessions = events \
    .key_by(lambda e: e.user_id) \
    .window(EventTimeSessionWindows.with_gap(Duration.of_minutes(30))) \
    .aggregate(SessionAggregator())

Как формируется сессия

Уникальность session windows — в их динамической природе. В отличие от tumbling/sliding, где границы окон известны заранее, у session window granular determined by data:

Session window: формирование и merge
gap=30min. Каждое событие изначально создаёт собственное мини-окно [t, t+30min). Окна объединяются, если перекрываются.
Первое событие создаёт окно [10:00, 10:30). Эта 'protowindow' существует, пока не появится другое событие или не сработает trigger.
e2 at 10:15
e2 создаёт окно [10:15, 10:45). Оно перекрывается с [10:00, 10:30) — Flink объединяет их в [10:00, 10:45). Все события и state переезжают в объединённое окно.
e3 создаёт [10:42, 11:12). Перекрытие с [10:00, 10:45) — merge в [10:00, 11:12). Окно продолжает расти.
watermark 11:13
Watermark прошёл за 11:12 — окно закрывается. SessionAggregator вычисляет финальный результат и emit-ит. State очищается.

Алгоритм формирования сессии:

  1. Каждое событие изначально создаёт собственное “мини-окно” [event_time, event_time + gap).
  2. Если новое мини-окно перекрывается с существующим — Flink выполняет merge: окна объединяются, state одного переносится в другое.
  3. Когда watermark проходит за end окна — окно закрывается и emit-ится.

Это означает, что в момент обработки события Flink может выполнить merge сразу нескольких окон: представьте сценарий, где e1 at 10:00, e2 at 10:50 (создали два независимых окна), и потом приходит e3 at 10:25. Окно [10:25, 10:55) объединит оба предыдущих в [10:00, 11:20).


Mergeable state

Чтобы merge работал, state внутри сессии должен поддерживать объединение. Это накладывает ограничения:

Можно использовать:

  • ReduceFunction — две сессии объединяются вызовом reduce(a, b).
  • AggregateFunction — accumulators объединяются методом merge(acc1, acc2).
  • ProcessWindowFunction — Flink буферизует элементы, при merge они сливаются в один список.

Нельзя без оговорок:

  • Кастомный ProcessWindowFunction с явной агрегацией, требующей знания order событий — порядок при merge не сохраняется.
  • State из getRuntimeContext().getState() внутри window function — это keyed state, не window state, и merge для него не определён.
public class ClicksAggregator implements AggregateFunction<Click, ClicksAcc, SessionStats> {
    @Override
    public ClicksAcc createAccumulator() {
        return new ClicksAcc(0, Long.MAX_VALUE, Long.MIN_VALUE);
    }

    @Override
    public ClicksAcc add(Click c, ClicksAcc acc) {
        return new ClicksAcc(acc.count + 1,
                             Math.min(acc.startTs, c.timestamp),
                             Math.max(acc.endTs, c.timestamp));
    }

    @Override
    public SessionStats getResult(ClicksAcc acc) {
        return new SessionStats(acc.count, acc.endTs - acc.startTs);
    }

    @Override
    public ClicksAcc merge(ClicksAcc a, ClicksAcc b) {
        return new ClicksAcc(
            a.count + b.count,
            Math.min(a.startTs, b.startTs),
            Math.max(a.endTs, b.endTs)
        );
    }
}

Метод merge критически важен для session windows: без корректного merge данные двух сессий потеряются при объединении.


Dynamic gap: gap зависит от данных

В реальных сценариях gap может варьироваться. Например, для премиум-пользователей “сессия” может быть до часа, для гостевых — 10 минут. Или для разных категорий устройств (мобильные vs десктоп) разные интервалы.

Flink предоставляет DynamicEventTimeSessionWindows:

DataStream<UserEvent> events = ...;

events.keyBy(UserEvent::getUserId)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<UserEvent>() {
        @Override
        public long extract(UserEvent ev) {
            return ev.isPremium ? Duration.ofHours(1).toMillis() : Duration.ofMinutes(10).toMillis();
        }
    }))
    .aggregate(new ClicksAggregator());

Gap вычисляется на каждое событие. Если разные события одной сессии возвращают разный gap, Flink использует gap текущего события для определения границы.

WARNING

Dynamic gap кажется удобным, но усложняет дебаг. Сессии могут вести себя непредсказуемо: одно событие с gap=10min создаёт короткое окно, следующее с gap=1hour может ретроактивно расширить его через merge. Используйте только когда есть чёткая бизнес-логика — иначе предпочтительнее фиксированный gap с post-processing в downstream.


Session windows и параллелизм

Каждая сессия принадлежит одному ключу — благодаря этому Flink может обрабатывать сессии разных пользователей параллельно. Внутри одного ключа merge — это последовательная операция: окно для user_id=U-42 не может быть merged с окном для user_id=U-87 (это разные сессии).

Производительность session windows сильно зависит от шаблона событий:

  • Sparse activity (один user — несколько событий в час) — окна редко merge-ятся, performance близок к обычным windows.
  • Bursty activity (тысячи событий быстро подряд) — много merge-операций, дополнительная работа state-backend.
  • Long sessions (часы, десятки тысяч событий в одном окне) — большое window state, медленное закрытие.

Поздние события и session merging

Поздние события (late events — за пределами watermark) могут не просто попасть в существующее окно, а слить два уже закрытых окна или открыть новое.

Пример: окна [10:00, 10:25) (закрыто на 10:55) и [11:30, 11:55) (закрыто на 12:25). Приходит late event с timestamp 10:45. Без allowed lateness это event дропается. С .allowedLateness(Duration.ofHours(2)) Flink:

  1. Создаёт временное окно [10:45, 11:15).
  2. Обнаруживает перекрытие с [10:00, 10:25) (если ещё в state, благодаря allowed lateness).
  3. Merge: получается [10:00, 11:15).
  4. Может перекрыться с [11:30, 11:55) — ещё один merge до [10:00, 11:55).
  5. Эмиттит обновлённый результат с включением late event и объединённого окна.

Это поведение требует, чтобы предыдущие сессии оставались в state допустимое время (allowed lateness). Бесконтрольное увеличение allowed lateness — путь к огромному window state.


Когда session windows — правильный выбор

  • User behavior analytics: clickstreams, дополнительные действия в рамках одного визита.
  • IoT device sessions: device активен, пока шлёт телеметрию; gap определяет “ушёл из эфира”.
  • Conversation threads: messages в чате группируются в conversations по времени.

Когда session windows плохо подходят:

  • Регулярная отчётность (hourly revenue) — tumbling.
  • Точные SLA (alert if avg > X for last 5 min) — sliding с фиксированным окном.
  • Counting things per period — tumbling/sliding.

Сравнение с ручной sessionization через ProcessFunction

В предыдущем модуле (Урок 04: keyed state patterns) мы реализовали sessionization вручную через KeyedProcessFunction с registerEventTimeTimer. Когда что выбрать?

Стандартный SessionWindow проще для случаев “просто gap + агрегат”:

  • Меньше кода.
  • Merge встроен.
  • Allowed lateness работает из коробки.

KeyedProcessFunction нужен когда:

  • Бизнес-событие закрывает сессию досрочно (logout, payment_done).
  • Нужна custom merge-логика, не вписывающаяся в AggregateFunction.merge.
  • Multi-stage sessions: одна сессия — последовательность мини-сессий с переходами.
  • Точный контроль над emit (например, периодические интерим-emit-ы внутри сессии).

В сомнительных случаях начинайте со стандартного SessionWindow — это меньше кода и проще понимать. Переходите на ProcessFunction, когда упрётесь в реальные ограничения.


Production-чеклист

  • Watermarks правильно настроены — без них SessionWindow никогда не закроется.
  • AggregateFunction.merge реализован корректно — без него merge сессий потеряет данные.
  • Размер state мониторится — длинные сессии (десятки часов) могут хранить много данных.
  • Allowed lateness разумен — слишком много = огромный state; слишком мало = поздние события дропаются.
  • Backend = RocksDB — для серьёзного объёма sessions heap не выдержит.

Попробуй сам

  1. Session merging visualization. Сгенерируйте поток событий с user_id=U-1 и timestamps 10:00, 10:15, 10:50, 11:25. С gap=30min должны получиться две сессии. Теперь добавьте событие 11:00 — все четыре события должны слиться в одну сессию через merge.

  2. Long session experiment. Создайте поток, где у user_id=U-1 события приходят каждую минуту в течение 24 часов. Замерьте размер window state на закрытии. Это иллюстрирует, как долгие сессии накапливают данные.

  3. Dynamic gap test. Реализуйте dynamic gap, где gap=60s для категории “browse” и gap=600s для “purchase”. Прогоните смешанный поток и наблюдайте, как разные категории формируют разные размеры сессий.

Проверка знанийKnowledge check
У вас Flink-job с EventTimeSessionWindows.withGap(30min) и AggregateFunction для подсчёта количества событий per сессия. После релиза вы заметили, что в выходных результатах часто фигурируют сессии с количеством событий 0. Что наиболее вероятная причина?
ОтветAnswer
Если в AggregateFunction не реализован метод merge() или он реализован некорректно (например, возвращает createAccumulator() вместо суммы), то при объединении двух сессий через session merging один из accumulators теряется. Конкретно: когда новое событие триггерит merge двух существующих окон, Flink вызывает merge(acc1, acc2). Если эта функция возвращает 'пустой' accumulator или обнуляет один из counters, при дальнейших add() будут считаться только новые события — а уже накопленные в одном из окон исчезнут. Симптом 'count = 0 в финальном результате' — частное проявление: вся сессия может состоять из событий, которые попали в окно, потом окно merge с другим, и из-за бага в merge все они потерялись. Решение: убедиться, что merge(a, b) корректно суммирует/объединяет accumulators. Это особенно критично для session windows; для tumbling/sliding merge редко вызывается.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. У вас Flink-job с EventTimeSessionWindows.withGap(30min) и custom AggregateFunction для подсчёта количества кликов в сессии. После релиза появились session-результаты с count=0. Какая наиболее вероятная причина?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5