Session windows
В отличие от tumbling и sliding окон, session windows не имеют фиксированной длительности. Они формируются на основе gap-а активности: пока события приходят друг за другом достаточно часто, они принадлежат одной сессии.
Sessionization в Kafka Streams Как только наступает “тишина” длиннее gap-а — сессия закрывается.
Этот тип окон близок к интуитивному пониманию “сессия пользователя”: активность на сайте делится на естественные периоды взаимодействия, разделённые паузами на еду, сон или переключение задач.
Базовое использование
DataStream<UserEvent> events = ...;
DataStream<Session> sessions = events
.keyBy(UserEvent::getUserId)
.window(EventTimeSessionWindows.withGap(Duration.ofMinutes(30)))
.aggregate(new SessionAggregator());
Каждый event с одним user_id попадает в session window. Окно растёт по мере поступления новых событий и закрывается, когда watermark проходит за момент last_event + gap.
В Python:
from pyflink.datastream.window import EventTimeSessionWindows
from pyflink.common import Duration
sessions = events \
.key_by(lambda e: e.user_id) \
.window(EventTimeSessionWindows.with_gap(Duration.of_minutes(30))) \
.aggregate(SessionAggregator())
Как формируется сессия
Уникальность session windows — в их динамической природе. В отличие от tumbling/sliding, где границы окон известны заранее, у session window granular determined by data:
Алгоритм формирования сессии:
- Каждое событие изначально создаёт собственное “мини-окно”
[event_time, event_time + gap). - Если новое мини-окно перекрывается с существующим — Flink выполняет merge: окна объединяются, state одного переносится в другое.
- Когда watermark проходит за
endокна — окно закрывается и emit-ится.
Это означает, что в момент обработки события Flink может выполнить merge сразу нескольких окон: представьте сценарий, где e1 at 10:00, e2 at 10:50 (создали два независимых окна), и потом приходит e3 at 10:25. Окно [10:25, 10:55) объединит оба предыдущих в [10:00, 11:20).
Mergeable state
Чтобы merge работал, state внутри сессии должен поддерживать объединение. Это накладывает ограничения:
Можно использовать:
ReduceFunction— две сессии объединяются вызовом reduce(a, b).AggregateFunction— accumulators объединяются методомmerge(acc1, acc2).ProcessWindowFunction— Flink буферизует элементы, при merge они сливаются в один список.
Нельзя без оговорок:
- Кастомный
ProcessWindowFunctionс явной агрегацией, требующей знания order событий — порядок при merge не сохраняется. - State из
getRuntimeContext().getState()внутри window function — это keyed state, не window state, и merge для него не определён.
public class ClicksAggregator implements AggregateFunction<Click, ClicksAcc, SessionStats> {
@Override
public ClicksAcc createAccumulator() {
return new ClicksAcc(0, Long.MAX_VALUE, Long.MIN_VALUE);
}
@Override
public ClicksAcc add(Click c, ClicksAcc acc) {
return new ClicksAcc(acc.count + 1,
Math.min(acc.startTs, c.timestamp),
Math.max(acc.endTs, c.timestamp));
}
@Override
public SessionStats getResult(ClicksAcc acc) {
return new SessionStats(acc.count, acc.endTs - acc.startTs);
}
@Override
public ClicksAcc merge(ClicksAcc a, ClicksAcc b) {
return new ClicksAcc(
a.count + b.count,
Math.min(a.startTs, b.startTs),
Math.max(a.endTs, b.endTs)
);
}
}
Метод merge критически важен для session windows: без корректного merge данные двух сессий потеряются при объединении.
Dynamic gap: gap зависит от данных
В реальных сценариях gap может варьироваться. Например, для премиум-пользователей “сессия” может быть до часа, для гостевых — 10 минут. Или для разных категорий устройств (мобильные vs десктоп) разные интервалы.
Flink предоставляет DynamicEventTimeSessionWindows:
DataStream<UserEvent> events = ...;
events.keyBy(UserEvent::getUserId)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<UserEvent>() {
@Override
public long extract(UserEvent ev) {
return ev.isPremium ? Duration.ofHours(1).toMillis() : Duration.ofMinutes(10).toMillis();
}
}))
.aggregate(new ClicksAggregator());
Gap вычисляется на каждое событие. Если разные события одной сессии возвращают разный gap, Flink использует gap текущего события для определения границы.
Dynamic gap кажется удобным, но усложняет дебаг. Сессии могут вести себя непредсказуемо: одно событие с gap=10min создаёт короткое окно, следующее с gap=1hour может ретроактивно расширить его через merge. Используйте только когда есть чёткая бизнес-логика — иначе предпочтительнее фиксированный gap с post-processing в downstream.
Session windows и параллелизм
Каждая сессия принадлежит одному ключу — благодаря этому Flink может обрабатывать сессии разных пользователей параллельно. Внутри одного ключа merge — это последовательная операция: окно для user_id=U-42 не может быть merged с окном для user_id=U-87 (это разные сессии).
Производительность session windows сильно зависит от шаблона событий:
- Sparse activity (один user — несколько событий в час) — окна редко merge-ятся, performance близок к обычным windows.
- Bursty activity (тысячи событий быстро подряд) — много merge-операций, дополнительная работа state-backend.
- Long sessions (часы, десятки тысяч событий в одном окне) — большое window state, медленное закрытие.
Поздние события и session merging
Поздние события (late events — за пределами watermark) могут не просто попасть в существующее окно, а слить два уже закрытых окна или открыть новое.
Пример: окна [10:00, 10:25) (закрыто на 10:55) и [11:30, 11:55) (закрыто на 12:25). Приходит late event с timestamp 10:45. Без allowed lateness это event дропается. С .allowedLateness(Duration.ofHours(2)) Flink:
- Создаёт временное окно
[10:45, 11:15). - Обнаруживает перекрытие с
[10:00, 10:25)(если ещё в state, благодаря allowed lateness). - Merge: получается
[10:00, 11:15). - Может перекрыться с
[11:30, 11:55)— ещё один merge до[10:00, 11:55). - Эмиттит обновлённый результат с включением late event и объединённого окна.
Это поведение требует, чтобы предыдущие сессии оставались в state допустимое время (allowed lateness). Бесконтрольное увеличение allowed lateness — путь к огромному window state.
Когда session windows — правильный выбор
- User behavior analytics: clickstreams, дополнительные действия в рамках одного визита.
- IoT device sessions: device активен, пока шлёт телеметрию; gap определяет “ушёл из эфира”.
- Conversation threads: messages в чате группируются в conversations по времени.
Когда session windows плохо подходят:
- Регулярная отчётность (hourly revenue) — tumbling.
- Точные SLA (
alert if avg > X for last 5 min) — sliding с фиксированным окном. - Counting things per period — tumbling/sliding.
Сравнение с ручной sessionization через ProcessFunction
В предыдущем модуле (Урок 04: keyed state patterns) мы реализовали sessionization вручную через KeyedProcessFunction с registerEventTimeTimer. Когда что выбрать?
Стандартный SessionWindow проще для случаев “просто gap + агрегат”:
- Меньше кода.
- Merge встроен.
- Allowed lateness работает из коробки.
KeyedProcessFunction нужен когда:
- Бизнес-событие закрывает сессию досрочно (
logout,payment_done). - Нужна custom merge-логика, не вписывающаяся в
AggregateFunction.merge. - Multi-stage sessions: одна сессия — последовательность мини-сессий с переходами.
- Точный контроль над emit (например, периодические интерим-emit-ы внутри сессии).
В сомнительных случаях начинайте со стандартного SessionWindow — это меньше кода и проще понимать. Переходите на ProcessFunction, когда упрётесь в реальные ограничения.
Production-чеклист
- Watermarks правильно настроены — без них SessionWindow никогда не закроется.
- AggregateFunction.merge реализован корректно — без него merge сессий потеряет данные.
- Размер state мониторится — длинные сессии (десятки часов) могут хранить много данных.
- Allowed lateness разумен — слишком много = огромный state; слишком мало = поздние события дропаются.
- Backend = RocksDB — для серьёзного объёма sessions heap не выдержит.
Попробуй сам
-
Session merging visualization. Сгенерируйте поток событий с user_id=U-1 и timestamps 10:00, 10:15, 10:50, 11:25. С gap=30min должны получиться две сессии. Теперь добавьте событие 11:00 — все четыре события должны слиться в одну сессию через merge.
-
Long session experiment. Создайте поток, где у user_id=U-1 события приходят каждую минуту в течение 24 часов. Замерьте размер window state на закрытии. Это иллюстрирует, как долгие сессии накапливают данные.
-
Dynamic gap test. Реализуйте dynamic gap, где gap=60s для категории “browse” и gap=600s для “purchase”. Прогоните смешанный поток и наблюдайте, как разные категории формируют разные размеры сессий.